7.7. Understand How Using Queue for Concurrency Requires Refactoring

In the previous item (see Item 57: “Avoid Creating New Thread Instances for On-demand Fan-out”) I covered the downsides of using Thread to solve the parallel I/O problem in the Game of Life example from earlier (see Item 56: “Know How to Recognize When Concurrency Is Necessary” for background and the implementations of various functions and classes below).

The next approach to try is to implement a threaded pipeline using the Queue class from the queue built-in module (see Item 55: “Use Queue to Coordinate Work Between Threads” for background; I rely on the implementations of ClosableQueue and StoppableWorker from that item in the example code below).

Here’s the general approach: Instead of creating one thread per cell per generation of the Game of Life, I can create a fixed number of worker threads upfront and have them do parallelized I/O as needed. This will keep my resource usage under control and eliminate the overhead of frequently starting new threads.

To do this, I need two ClosableQueue instances to use for communicating to and from the worker threads that execute the game_logic function:

from queue import Queue

class ClosableQueue(Queue):

...

in_queue = ClosableQueue() out_queue = ClosableQueue()

I can start multiple threads that will consume items from the in_queue, process them by calling game_logic, and put the results on out_queue. These threads will run concurrently, allowing for parallel I/O and reduced latency for each generation:

Click here to view code image

from threading import Thread

class StoppableWorker(Thread):

...

def game_logic(state, neighbors):

... # Do some blocking input/output in here: data = my_socket.recv(100) ...

def game_logic_thread(item):

y, x, state, neighbors = item try:

next_state = game_logic(state, neighbors)

except Exception as e:

next_state = e

return (y, x, next_state)

# Start the threads upfront threads = [] for _ in range(5):

thread = StoppableWorker(

game_logic_thread, in_queue, out_queue)

thread.start() threads.append(thread)

Now, I can redefine the simulate function to interact with these queues to request state transition decisions and receive corresponding responses. Adding items to in_queue causes fan-out, and consuming items from out_queue until it’s empty causes fan-in:

Click here to view code image

>>> ALIVE = '*'
>>> EMPTY = '-'
>>>
>>> class SimulationError(Exception):
>>>     pass
>>>
>>> class Grid:
>>>     ...
>>>
>>> def count_neighbors(y, x, get):
>>>     ...
>>>
>>> def simulate_pipeline(grid, in_queue, out_queue):
>>>     for y in range(grid.height):
>>>         for x in range(grid.width):
>>>             state = grid.get(y, x)
>>>             neighbors = count_neighbors(y, x, grid.get)
>>>             in_queue.put((y, x, state, neighbors))  # Fan out
>>>
>>>     in_queue.join()
>>>     out_queue.close()
>>>
>>>     next_grid = Grid(grid.height, grid.width)
>>>     for item in out_queue:                          # Fan in
>>>         y, x, next_state = item
>>>         if isinstance(next_state, Exception):
>>>             raise SimulationError(y, x) from next_state
>>>         next_grid.set(y, x, next_state)
>>>
>>>     return next_grid

The calls to Grid.get and Grid.set both happen within this new simulate_pipeline function, which means I can use the single-threaded implementation of Grid instead of the implementation that requires Lock instances for synchronization.

This code is also easier to debug than the Thread approach used in the previous item. If an exception occurs while doing I/O in the game_logic function, it will be caught, propagated to the out_queue, and then re-raised in the main thread:

Click here to view code image

def game_logic(state, neighbors):

... raise OSError('Problem with I/O in game_logic') ...

simulate_pipeline(Grid(1, 1), in_queue, out_queue)

>>>
Traceback ...
OSError: Problem with I/O in game_logic

The above exception was the direct cause of the following ➥exception:

Traceback ... SimulationError: (0, 0)

I can drive this multithreaded pipeline for repeated generations by calling simulate_pipeline in a loop:

Click here to view code image

>>> class ColumnPrinter:
>>>     ...
>>>
>>> grid = Grid(5, 9)
>>> grid.set(0, 3, ALIVE)
>>> grid.set(1, 4, ALIVE)
>>> grid.set(2, 2, ALIVE)
>>> grid.set(2, 3, ALIVE)
>>> grid.set(2, 4, ALIVE)
>>>
>>> columns = ColumnPrinter()
>>> for i in range(5):
>>>     columns.append(str(grid))
>>>     grid = simulate_pipeline(grid, in_queue, out_queue)
>>>
>>> print(columns)
>>>
>>> for thread in threads:
>>>     in_queue.close()
>>> for thread in threads:
>>>     thread.join()
>>> >>>
>>>     0     |     1     |     2     |     3     |     4
>>> ---*----- | --------- | --------- | --------- | ---------
>>> ----*---- | --------- | --*-*---- | --------- | ----*----
>>> --***---- | --------- | ---**---- | --------- | --*-*----
>>> --------- | --------- | ---*----- | --------- | ---**----
>>> --------- | --------- | --------- | --------- | ---------
  Cell In[2], line 23
    >>>
    ^
SyntaxError: invalid syntax

The results are the same as before. Although I’ve addressed the memory explosion problem, startup costs, and debugging issues of using threads on their own, many issues remain:

  • The simulate_pipeline function is even harder to follow than the simulate_threaded approach from the previous item.

  • Extra support classes were required for ClosableQueue and StoppableWorker in order to make the code easier to read, at the expense of increased complexity.

  • I have to specify the amount of potential parallelism—the number of threads running game_logic_thread—upfront based on my expectations of the workload instead of having the system automatically scale up parallelism as needed.

  • In order to enable debugging, I have to manually catch exceptions in worker threads, propagate them on a Queue, and then re-raise them in the main thread.

However, the biggest problem with this code is apparent if the requirements change again. Imagine that later I needed to do I/O within the count_neighbors function in addition to the I/O that was needed within game_logic:

Click here to view code image

def count_neighbors(y, x, get):

...

# Do some blocking input/output in here: data = my_socket.recv(100) ...

In order to make this parallelizable, I need to add another stage to the pipeline that runs count_neighbors in a thread. I need to make sure that exceptions propagate correctly between the worker threads and the main thread. And I need to use a Lock for the Grid class in order to ensure safe synchronization between the worker threads (see Item 54: “Use Lock to Prevent Data Races in Threads” for background and Item 57: “Avoid Creating New Thread Instances for On-demand Fan-out” for the implementation of LockingGrid):

Click here to view code image

>>> def count_neighbors_thread(item):
>>>     y, x, state, get = item
>>>     try:
>>>         neighbors = count_neighbors(y, x, get)
>>>     except Exception as e:
>>>         neighbors = e
>>>     return (y, x, state, neighbors)
>>>
>>> def game_logic_thread(item):
>>>     y, x, state, neighbors = item
>>>     if isinstance(neighbors, Exception):
>>>         next_state = neighbors
>>>     else:
>>>         try:
>>>             next_state = game_logic(state, neighbors)
>>>         except Exception as e:
>>>             next_state = e
>>>     return (y, x, next_state)
>>>
>>> class LockingGrid(Grid):
>>>     ...

I have to create another set of Queue instances for the count_neighbors_thread workers and the corresponding Thread instances:

Click here to view code image

in_queue = ClosableQueue() logic_queue = ClosableQueue() out_queue = ClosableQueue()

threads = []

for _ in range(5):
thread = StoppableWorker(

count_neighbors_thread, in_queue, logic_queue)

thread.start() threads.append(thread)

for _ in range(5):
thread = StoppableWorker(

game_logic_thread, logic_queue, out_queue)

thread.start() threads.append(thread)

Finally, I need to update simulate_pipeline to coordinate the multiple phases in the pipeline and ensure that work fans out and back in correctly:

Click here to view code image

>>> def simulate_phased_pipeline(
>>>         grid, in_queue, logic_queue, out_queue):
>>>     for y in range(grid.height):
>>>         for x in range(grid.width):
>>>             state = grid.get(y, x)
>>>             item = (y, x, state, grid.get)
>>>             in_queue.put(item)          # Fan out
>>>
>>>     in_queue.join()
>>>     logic_queue.join()                  # Pipeline sequencing
>>>     out_queue.close()
>>>
>>>     next_grid = LockingGrid(grid.height, grid.width)
>>>     for item in out_queue:              # Fan in
>>>         y, x, next_state = item
>>>         if isinstance(next_state, Exception):
>>>             raise SimulationError(y, x) from next_state
>>>         next_grid.set(y, x, next_state)
>>>
>>>     return next_grid

With these updated implementations, now I can run the multiphase pipeline end-to-end:

Click here to view code image

grid = LockingGrid(5, 9) grid.set(0, 3, ALIVE) grid.set(1, 4, ALIVE) grid.set(2, 2, ALIVE) grid.set(2, 3, ALIVE) grid.set(2, 4, ALIVE)

columns = ColumnPrinter() for i in range(5):

columns.append(str(grid)) grid = simulate_phased_pipeline(

grid, in_queue, logic_queue, out_queue)

print(columns)

for thread in threads:

in_queue.close()

for thread in threads:

logic_queue.close()

for thread in threads:

thread.join()

>>>
    0     |     1     |     2     |     3     |     4
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------

Again, this works as expected, but it required a lot of changes and boilerplate. The point here is that Queue does make it possible to solve fan-out and fan-in problems, but the overhead is very high. Although using Queue is a better approach than using Thread instances on their own, it’s still not nearly as good as some of the other tools provided by Python (see Item 59: “Consider ThreadPoolExecutor When Threads Are Necessary for Concurrency” and Item 60: “Achieve Highly Concurrent I/O with Coroutines”).

7.7.1. Things to Remember

✦ Using Queue instances with a fixed number of worker threads improves the scalability of fan-out and fan-in using threads.

✦ It takes a significant amount of work to refactor existing code to use Queue, especially when multiple stages of a pipeline are required.

✦ Using Queue fundamentally limits the total amount of I/O parallelism a program can leverage compared to alternative approaches provided by other built-in Python features and modules.