7.4. Use Queue to Coordinate Work Between Threads¶
Python programs that do many things concurrently often need to coordinate their work. One of the most useful arrangements for concurrent work is a pipeline of functions.
A pipeline works like an assembly line used in manufacturing. Pipelines have many phases in serial, with a specific function for each phase. New pieces of work are constantly being added to the beginning of the pipeline. The functions can operate concurrently, each working on the piece of work in its phase. The work moves forward as each function completes until there are no phases remaining. This approach is especially good for work that includes blocking I/O or subprocesses—activities that can easily be parallelized using Python (see Item 53: “Use Threads for Blocking I/O, Avoid for Parallelism”).
For example, say I want to build a system that will take a constant stream of images from my digital camera, resize them, and then add them to a photo gallery online. Such a program could be split into three phases of a pipeline. New images are retrieved in the first phase. The downloaded images are passed through the resize function in the second phase. The resized images are consumed by the upload function in the final phase.
Imagine that I’ve already written Python functions that execute the phases: download, resize, upload. How do I assemble a pipeline to do the work concurrently?
>>> def download(item):
>>> ...
>>>
>>> def resize(item):
>>> ...
>>>
>>> def upload(item):
>>> ...
The first thing I need is a way to hand off work between the pipeline phases. This can be modeled as a thread-safe producer–consumer queue (see Item 54: “Use Lock to Prevent Data Races in Threads” to understand the importance of thread safety in Python; see Item 71: “Prefer deque for Producer–Consumer Queues” to understand queue performance):
>>> from collections import deque
>>> from threading import Lock
>>>
>>> class MyQueue:
>>> def __init__(self):
>>> self.items = deque()
>>> self.lock = Lock()
The producer, my digital camera, adds new images to the end of the deque of pending items:
>>> class MyQueue:
>>> def __init__(self):
>>> self.items = deque()
>>> self.lock = Lock()
>>> def put(self, item):
>>> with self.lock:
>>> self.items.append(item)
The consumer, the first phase of the processing pipeline, removes images from the front of the deque of pending items:
>>> class MyQueue:
>>> def __init__(self):
>>> self.items = deque()
>>> self.lock = Lock()
>>>
>>> def put(self, item):
>>> with self.lock:
>>> self.items.append(item)
>>>
>>> def get(self):
>>> with self.lock:
>>> return self.items.popleft()
Here, I represent each phase of the pipeline as a Python thread that takes work from one queue like this, runs a function on it, and puts the result on another queue. I also track how many times the worker has checked for new input and how much work it’s completed:
>>> from threading import Thread
>>> import time
>>>
>>> class Worker(Thread):
>>> def __init__(self, func, in_queue, out_queue):
>>> super().__init__()
>>> self.func = func
>>> self.in_queue = in_queue
>>> self.out_queue = out_queue
>>> self.polled_count = 0
>>> self.work_done = 0
The trickiest part is that the worker thread must properly handle the case where the input queue is empty because the previous phase hasn’t completed its work yet. This happens where I catch the IndexError exception below. You can think of this as a holdup in the assembly line:
>>> class Worker(Thread):
>>> def __init__(self, func, in_queue, out_queue):
>>> super().__init__()
>>> self.func = func
>>> self.in_queue = in_queue
>>> self.out_queue = out_queue
>>> self.polled_count = 0
>>> self.work_done = 0
>>> def run(self):
>>> while True:
>>> self.polled_count += 1
>>> try:
>>> item = self.in_queue.get()
>>> except IndexError:
>>> time.sleep(0.01) # No work to do
>>> else:
>>> result = self.func(item)
>>> self.out_queue.put(result)
>>> self.work_done += 1
Now, I can connect the three phases together by creating the queues for their coordination points and the corresponding worker threads:
>>> download_queue = MyQueue()
>>> resize_queue = MyQueue()
>>> upload_queue = MyQueue()
>>> done_queue = MyQueue()
>>> threads = [
>>> Worker(download, download_queue, resize_queue),
>>> Worker(resize, resize_queue, upload_queue),
>>> Worker(upload, upload_queue, done_queue),
>>> ]
I can start the threads and then inject a bunch of work into the first phase of the pipeline. Here, I use a plain object instance as a proxy for the real data required by the download function:
>>> for thread in threads:
>>> thread.start()
>>>
>>> for _ in range(1000):
>>> download_queue.put(object())
Now, I wait for all of the items to be processed by the pipeline and end up in the done_queue:
>>> while len(done_queue.items) < 1000:
>>> # Do something useful while waiting
>>> ...
This runs properly, but there’s an interesting side effect caused by the threads polling their input queues for new work. The tricky part, where I catch IndexError exceptions in the run method, executes a large number of times:
>>> processed = len(done_queue.items)
>>> polled = sum(t.polled_count for t in threads)
>>> print(f'Processed {processed} items after '
>>> f'polling {polled} times')
Processed 1000 items after polling 3007 times
When the worker functions vary in their respective speeds, an earlier phase can prevent progress in later phases, backing up the pipeline. This causes later phases to starve and constantly check their input queues for new work in a tight loop. The outcome is that worker threads waste CPU time doing nothing useful; they’re constantly raising and catching IndexError exceptions.
But that’s just the beginning of what’s wrong with this implementation. There are three more problems that you should also avoid. First, determining that all of the input work is complete requires yet another busy wait on the done_queue. Second, in Worker, the run method will execute forever in its busy loop. There’s no obvious way to signal to a worker thread that it’s time to exit.
Third, and worst of all, a backup in the pipeline can cause the program to crash arbitrarily. If the first phase makes rapid progress but the second phase makes slow progress, then the queue connecting the first phase to the second phase will constantly increase in size. The second phase won’t be able to keep up. Given enough time and input data, the program will eventually run out of memory and die.
The lesson here isn’t that pipelines are bad; it’s that it’s hard to build a good producer–consumer queue yourself. So why even try?
7.4.1. Queue to the Rescue¶
The Queue class from the queue built-in module provides all of the functionality you need to solve the problems outlined above.
Queue eliminates the busy waiting in the worker by making the get method block until new data is available. For example, here I start a thread that waits for some input data on a queue:
>>> from queue import Queue
>>>
>>> my_queue = Queue()
>>>
>>> def consumer():
>>> print('Consumer waiting')
>>> my_queue.get() # Runs after put() below
>>> print('Consumer done')
>>>
>>> thread = Thread(target=consumer)
>>> thread.start()
Consumer waiting
Even though the thread is running first, it won’t finish until an item is put on the Queue instance and the get method has something to return:
>>> print('Producer putting')
>>> my_queue.put(object()) # Runs before get() above
>>> print('Producer done')
>>> thread.join()
Producer putting
Producer done
Consumer done
To solve the pipeline backup issue, the Queue class lets you specify the maximum amount of pending work to allow between two phases.
This buffer size causes calls to put to block when the queue is already full. For example, here I define a thread that waits for a while before consuming a queue:
>>> my_queue = Queue(1) # Buffer size of 1
>>>
>>> def consumer():
>>> time.sleep(0.1) # Wait
>>> my_queue.get() # Runs second
>>> print('Consumer got 1')
>>> my_queue.get() # Runs fourth
>>> print('Consumer got 2')
>>> print('Consumer done')
>>>
>>> thread = Thread(target=consumer)
>>> thread.start()
The wait should allow the producer thread to put both objects on the queue before the consumer thread ever calls get. But the Queue size is one. This means the producer adding items to the queue will have to wait for the consumer thread to call get at least once before the second call to put will stop blocking and add the second item to the queue:
>>> my_queue.put(object()) # Runs first
>>> print('Producer put 1')
>>> my_queue.put(object()) # Runs third
>>> print('Producer put 2')
>>> print('Producer done')
>>> thread.join()
Producer put 1
Consumer got 1
Producer put 2
Producer done
Consumer got 2
Consumer done
The Queue class can also track the progress of work using the task_done method. This lets you wait for a phase’s input queue to drain and eliminates the need to poll the last phase of a pipeline (as with the done_queue above). For example, here I define a consumer thread that calls task_done when it finishes working on an item:
>>> in_queue = Queue()
>>> def consumer():
>>> print('Consumer waiting')
>>> work = in_queue.get() # Runs second
>>> print('Consumer working')
>>> # Doing work
>>> ...
>>> print('Consumer done')
>>> in_queue.task_done() # Runs third
>>>
>>> thread = Thread(target=consumer)
>>> thread.start()
Consumer waiting
Now, the producer code doesn’t have to join the consumer thread or poll. The producer can just wait for the in_queue to finish by calling join on the Queue instance. Even once it’s empty, the in_queue won’t be joinable until after task_done is called for every item that was ever enqueued:
>>> print('Producer putting')
>>> in_queue.put(object()) # Runs first
>>> print('Producer waiting')
>>> in_queue.join() # Runs fourth
>>> print('Producer done')
>>> thread.join()
Producer putting
Producer waiting
Consumer working
Consumer done
Producer done
I can put all these behaviors together into a Queue subclass that also tells the worker thread when it should stop processing. Here, I define a close method that adds a special sentinel item to the queue that indicates there will be no more input items after it:
>>> class ClosableQueue(Queue):
>>> SENTINEL = object()
>>>
>>> def close(self):
>>> self.put(self.SENTINEL)
Then, I define an iterator for the queue that looks for this special object and stops iteration when it’s found. This iter method also calls task_done at appropriate times, letting me track the progress of work on the queue (see Item 31: “Be Defensive When Iterating Over Arguments” for details about iter):
>>> def __iter__(self):
>>> while True:
>>> item = self.get()
>>> try:
>>> if item is self.SENTINEL:
>>> return # Cause the thread to exit
>>> yield item
>>> finally:
>>> self.task_done()
Now, I can redefine my worker thread to rely on the behavior of the ClosableQueue class. The thread will exit when the for loop is exhausted:
>>> class StoppableWorker(Thread):
>>> def __init__(self, func, in_queue, out_queue):
>>> super().__init__()
>>> self.func = func
>>> self.in_queue = in_queue
>>> self.out_queue = out_queue
>>>
>>> def run(self):
>>> for item in self.in_queue:
>>> result = self.func(item)
>>> self.out_queue.put(result)
I re-create the set of worker threads using the new worker class:
>>> download_queue = ClosableQueue()
>>> resize_queue = ClosableQueue()
>>> upload_queue = ClosableQueue()
>>> done_queue = ClosableQueue()
>>> threads = [
>>> StoppableWorker(download, download_queue, resize_queue),
>>> StoppableWorker(resize, resize_queue, upload_queue),
>>> StoppableWorker(upload, upload_queue, done_queue),
>>> ]
After running the worker threads as before, I also send the stop signal after all the input work has been injected by closing the input queue of the first phase:
- for thread in threads:
thread.start()
- for _ in range(1000):
download_queue.put(object())
download_queue.close()
Finally, I wait for the work to finish by joining the queues that connect the phases. Each time one phase is done, I signal the next phase to stop by closing its input queue. At the end, the done_queue contains all of the output objects, as expected:
download_queue.join() resize_queue.close() resize_queue.join() upload_queue.close() upload_queue.join() print(done_queue.qsize(), 'items finished')
- for thread in threads:
thread.join()
This approach can be extended to use multiple worker threads per phase, which can increase I/O parallelism and speed up this type of program significantly. To do this, first I define some helper functions that start and stop multiple threads. The way stop_threads works is by calling close on each input queue once per consuming thread, which ensures that all of the workers exit cleanly:
- def start_threads(count, *args):
threads = [StoppableWorker(*args) for _ in range(count)] for thread in threads:
thread.start()
return threads
- def stop_threads(closable_queue, threads):
- for _ in threads:
closable_queue.close()
closable_queue.join()
- for thread in threads:
thread.join()
Then, I connect the pieces together as before, putting objects to process into the top of the pipeline, joining queues and threads along the way, and finally consuming the results:
download_queue = ClosableQueue() resize_queue = ClosableQueue() upload_queue = ClosableQueue() done_queue = ClosableQueue()
- download_threads = start_threads(
3, download, download_queue, resize_queue)
- resize_threads = start_threads(
4, resize, resize_queue, upload_queue)
- upload_threads = start_threads(
5, upload, upload_queue, done_queue)
- for _ in range(1000):
download_queue.put(object())
stop_threads(download_queue, download_threads) stop_threads(resize_queue, resize_threads) stop_threads(upload_queue, upload_threads)
print(done_queue.qsize(), 'items finished')
>>>
1000 items finished
Although Queue works well in this case of a linear pipeline, there are many other situations for which there are better tools that you should consider (see Item 60: “Achieve Highly Concurrent I/O with Coroutines”).
## Things to Remember
✦ Pipelines are a great way to organize sequences of work—especially I/O-bound programs—that run concurrently using multiple Python threads.
✦ Be aware of the many problems in building concurrent pipelines: busy waiting, how to tell workers to stop, and potential memory explosion.
✦ The Queue class has all the facilities you need to build robust pipelines: blocking operations, buffer sizes, and joining.