7.8. Consider ThreadPoolExecutor When Threads Are Necessary for Concurrency¶
Python includes the concurrent.futures built-in module, which provides the ThreadPoolExecutor class. It combines the best of the Thread (see Item 57: “Avoid Creating New Thread Instances for On-demand Fan-out”) and Queue (see Item 58: “Understand How Using Queue for Concurrency Requires Refactoring”) approaches to solving the parallel I/O problem from the Game of Life example (see Item 56: “Know How to Recognize When Concurrency Is Necessary” for background and the implementations of various functions and classes below):
Click here to view code image
ALIVE = '*' EMPTY = '-'
- class Grid:
...
- class LockingGrid(Grid):
...
- def count_neighbors(y, x, get):
...
- def game_logic(state, neighbors):
... # Do some blocking input/output in here: data = my_socket.recv(100) ...
- def step_cell(y, x, get, set):
state = get(y, x) neighbors = count_neighbors(y, x, get) next_state = game_logic(state, neighbors) set(y, x, next_state)
Instead of starting a new Thread instance for each Grid square, I can fan out by submitting a function to an executor that will be run in a separate thread. Later, I can wait for the result of all tasks in order to fan in:
Click here to view code image
>>> from concurrent.futures import ThreadPoolExecutor
>>>
>>> def simulate_pool(pool, grid):
>>> next_grid = LockingGrid(grid.height, grid.width)
>>>
>>> futures = []
>>> for y in range(grid.height):
>>> for x in range(grid.width):
>>> args = (y, x, grid.get, next_grid.set)
>>> future = pool.submit(step_cell, *args) # Fan out
>>> futures.append(future)
>>>
>>> for future in futures:
>>> future.result() # Fan in
>>>
>>> return next_grid
The threads used for the executor can be allocated in advance, which means I don’t have to pay the startup cost on each execution of simulate_pool. I can also specify the maximum number of threads to use for the pool—using the max_workers parameter—to prevent the memory blow-up issues associated with the naive Thread solution to the parallel I/O problem:
Click here to view code image
- class ColumnPrinter:
...
grid = LockingGrid(5, 9) grid.set(0, 3, ALIVE) grid.set(1, 4, ALIVE) grid.set(2, 2, ALIVE) grid.set(2, 3, ALIVE) grid.set(2, 4, ALIVE)
columns = ColumnPrinter() with ThreadPoolExecutor(max_workers=10) as pool:
- for i in range(5):
columns.append(str(grid)) grid = simulate_pool(pool, grid)
print(columns)
>>>
0 | 1 | 2 | 3 | 4
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------
The best part about the ThreadPoolExecutor class is that it automatically propagates exceptions back to the caller when the result method is called on the Future instance returned by the submit method:
Click here to view code image
- def game_logic(state, neighbors):
... raise OSError('Problem with I/O') ...
- with ThreadPoolExecutor(max_workers=10) as pool:
task = pool.submit(game_logic, ALIVE, 3) task.result()
>>>
Traceback ...
OSError: Problem with I/O
If I needed to provide I/O parallelism for the count_neighbors function in addition to game_logic, no modifications to the program would be required since ThreadPoolExecutor already runs these functions concurrently as part of step_cell. It’s even possible to achieve CPU parallelism by using the same interface if necessary (see Item 64: “Consider concurrent.futures for True Parallelism”).
However, the big problem that remains is the limited amount of I/O parallelism that ThreadPoolExecutor provides. Even if I use a max_workers parameter of 100, this solution still won’t scale if I need 10,000+ cells in the grid that require simultaneous I/O. ThreadPoolExecutor is a good choice for situations where there is no asynchronous solution (e.g., file I/O), but there are better ways to maximize I/O parallelism in many cases (see Item 60: “Achieve Highly Concurrent I/O with Coroutines”).
7.8.1. Things to Remember¶
✦ ThreadPoolExecutor enables simple I/O parallelism with limited refactoring, easily avoiding the cost of thread startup each time fanout concurrency is required.
✦ Although ThreadPoolExecutor eliminates the potential memory blow-up issues of using threads directly, it also limits I/O parallelism by requiring max_workers to be specified upfront.