7.11. Mix Threads and Coroutines to Ease the Transition to asyncio¶
In the previous item (see Item 61: “Know How to Port Threaded I/O to asyncio”), I ported a TCP server that does blocking I/O with threads over to use asyncio with coroutines. The transition was big-bang: I moved all of the code to the new style in one go. But it’s rarely feasible to port a large program this way. Instead, you usually need to incrementally migrate your codebase while also updating your tests as needed and verifying that everything works at each step along the way.
In order to do that, your codebase needs to be able to use threads for blocking I/O (see Item 53: “Use Threads for Blocking I/O, Avoid for Parallelism”) and coroutines for asynchronous I/O (see Item 60: “Achieve Highly Concurrent I/O with Coroutines”) at the same time in a way that’s mutually compatible. Practically, this means that you need threads to be able to run coroutines, and you need coroutines to be able to start and wait on threads. Luckily, asyncio includes built-in facilities for making this type of interoperability straightforward.
For example, say that I’m writing a program that merges log files into one output stream to aid with debugging. Given a file handle for an input log, I need a way to detect whether new data is available and return the next line of input. I can do this using the tell method of the file handle to check whether the current read position matches the length of the file. When no new data is present, an exception should be raised (see Item 20: “Prefer Raising Exceptions to Returning None” for background):
>>> class NoNewData(Exception):
>>> pass
>>>
>>> def readline(handle):
>>> offset = handle.tell()
>>> handle.seek(0, 2)
>>> length = handle.tell()
>>>
>>> if length == offset:
>>> raise NoNewData
>>>
>>> handle.seek(offset, 0)
>>> return handle.readline()
By wrapping this function in a while loop, I can turn it into a worker thread. When a new line is available, I call a given callback function to write it to the output log (see Item 38: “Accept Functions Instead of Classes for Simple Interfaces” for why to use a function interface for this instead of a class). When no data is available, the thread sleeps to reduce the amount of busy waiting caused by polling for new data. When the input file handle is closed, the worker thread exits:
Click here to view code image
>>> import time
>>>
>>> def tail_file(handle, interval, write_func):
>>> while not handle.closed:
>>> try:
>>> line = readline(handle)
>>> except NoNewData:
>>> time.sleep(interval)
>>> else:
>>> write_func(line)
Now, I can start one worker thread per input file and unify their output into a single output file. The write helper function below needs to use a Lock instance (see Item 54: “Use Lock to Prevent Data Races in Threads”) in order to serialize writes to the output stream and make sure that there are no intra-line conflicts:
Click here to view code image
>>> from threading import Lock, Thread
>>>
>>> def run_threads(handles, interval, output_path):
>>> with open(output_path, 'wb') as output:
>>> lock = Lock()
>>> def write(data):
>>> with lock:
>>> output.write(data)
>>>
>>> threads = []
>>> for handle in handles:
>>> args = (handle, interval, write)
>>> thread = Thread(target=tail_file, args=args)
>>> thread.start()
>>> threads.append(thread)
>>>
>>> for thread in threads:
>>> thread.join()
As long as an input file handle is still alive, its corresponding worker thread will also stay alive. That means it’s sufficient to wait for the join method from each thread to complete in order to know that the whole process is done.
Given a set of input paths and an output path, I can call run_threads and confirm that it works as expected. How the input file handles are created or separately closed isn’t important in order to demonstrate this code’s behavior, nor is the output verification function—defined in confirm_merge that follows—which is why I’ve left them out here:
Click here to view code image
- def confirm_merge(input_paths, output_path):
...
input_paths = ... handles = ... output_path = ... run_threads(handles, 0.1, output_path)
confirm_merge(input_paths, output_path)
With this threaded implementation as the starting point, how can I incrementally convert this code to use asyncio and coroutines instead? There are two approaches: top-down and bottom-up.
Top-down means starting at the highest parts of a codebase, like in the main entry points, and working down to the individual functions and classes that are the leaves of the call hierarchy. This approach can be useful when you maintain a lot of common modules that you use across many different programs. By porting the entry points first, you can wait to port the common modules until you’re already using coroutines everywhere else.
The concrete steps are:
Change a top function to use async def instead of def.
Wrap all of its calls that do I/O—potentially blocking the event loop—to use asyncio.run_in_executor instead.
Ensure that the resources or callbacks used by run_in_executor invocations are properly synchronized (i.e., using Lock or the asyncio.run_coroutine_threadsafe function).
Try to eliminate get_event_loop and run_in_executor calls by moving downward through the call hierarchy and converting intermediate functions and methods to coroutines (following the first three steps).
Here, I apply steps 1–3 to the run_threads function:
Click here to view code image
>>> import asyncio
>>>
>>> async def run_tasks_mixed(handles, interval, output_path):
>>> loop = asyncio.get_event_loop()
>>>
>>> with open(output_path, 'wb') as output:
>>> async def write_async(data):
>>> output.write(data)
>>>
>>> def write(data):
>>> coro = write_async(data)
>>> future = asyncio.run_coroutine_threadsafe(
>>> coro, loop)
>>> future.result()
>>>
>>> tasks = []
>>> for handle in handles:
>>> task = loop.run_in_executor(
>>> None, tail_file, handle, interval, write)
>>> tasks.append(task)
>>>
>>> await asyncio.gather(*tasks)
The run_in_executor method instructs the event loop to run a given function—tail_file in this case—using a specific ThreadPoolExecutor (see Item 59: “Consider ThreadPoolExecutor When Threads Are Necessary for Concurrency”) or a default executor instance when the first parameter is None. By making multiple calls to run_in_executor without corresponding await expressions, the run_tasks_mixed coroutine fans out to have one concurrent line of execution for each input file. Then, the asyncio.gather function along with an await expression fans in the tail_file threads until they all complete (see Item 56: “Know How to Recognize When Concurrency Is Necessary” for more about fan-out and fan-in).
This code eliminates the need for the Lock instance in the write helper by using asyncio.run_coroutine_threadsafe. This function allows plain old worker threads to call a coroutine—write_async in this case—and have it execute in the event loop from the main thread (or from any other thread, if necessary). This effectively synchronizes the threads together and ensures that all writes to the output file are only done by the event loop in the main thread. Once the asyncio.gather awaitable is resolved, I can assume that all writes to the output file have also completed, and thus I can close the output file handle in the with statement without having to worry about race conditions.
I can verify that this code works as expected. I use the asyncio.run function to start the coroutine and run the main event loop:
Click here to view code image
input_paths = ... handles = ... output_path = ... asyncio.run(run_tasks_mixed(handles, 0.1, output_path))
confirm_merge(input_paths, output_path)
Now, I can apply step 4 to the run_tasks_mixed function by moving down the call stack. I can redefine the tail_file dependent function to be an asynchronous coroutine instead of doing blocking I/O by following steps 1–3:
Click here to view code image
>>> async def tail_async(handle, interval, write_func):
>>> loop = asyncio.get_event_loop()
>>>
>>> while not handle.closed:
>>> try:
>>> line = await loop.run_in_executor(
>>> None, readline, handle)
>>> except NoNewData:
>>> await asyncio.sleep(interval)
>>> else:
>>> await write_func(line)
This new implementation of tail_async allows me to push calls to get_event_loop and run_in_executor down the stack and out of the run_tasks_mixed function entirely. What’s left is clean and much easier to follow:
Click here to view code image
>>> async def run_tasks(handles, interval, output_path):
>>> with open(output_path, 'wb') as output:
>>> async def write_async(data):
>>> output.write(data)
>>> tasks = []
>>> for handle in handles:
>>> coro = tail_async(handle, interval, write_async)
>>> task = asyncio.create_task(coro)
>>> tasks.append(task)
>>> await asyncio.gather(*tasks)
I can verify that run_tasks works as expected, too:
Click here to view code image
input_paths = ... handles = ... output_path = ... asyncio.run(run_tasks(handles, 0.1, output_path))
confirm_merge(input_paths, output_path)
It’s possible to continue this iterative refactoring pattern and convert readline into an asynchronous coroutine as well. However, that function requires so many blocking file I/O operations that it doesn’t seem worth porting, given how much that would reduce the clarity of the code and hurt performance. In some situations, it makes sense to move everything to asyncio, and in others it doesn’t.
The bottom-up approach to adopting coroutines has four steps that are similar to the steps of the top-down style, but the process traverses the call hierarchy in the opposite direction: from leaves to entry points.
The concrete steps are:
Create a new asynchronous coroutine version of each leaf function that you’re trying to port.
Change the existing synchronous functions so they call the coroutine versions and run the event loop instead of implementing any real behavior.
Move up a level of the call hierarchy, make another layer of coroutines, and replace existing calls to synchronous functions with calls to the coroutines defined in step 1.
Delete synchronous wrappers around coroutines created in step 2 as you stop requiring them to glue the pieces together.
For the example above, I would start with the tail_file function since I decided that the readline function should keep using blocking I/O. I can rewrite tail_file so it merely wraps the tail_async coroutine that I defined above. To run that coroutine until it finishes, I need to create an event loop for each tail_file worker thread and then call its run_until_complete method. This method will block the current thread and drive the event loop until the tail_async coroutine exits, achieving the same behavior as the threaded, blocking I/O version of tail_file:
Click here to view code image
>>> def tail_file(handle, interval, write_func):
>>> loop = asyncio.new_event_loop()
>>> asyncio.set_event_loop(loop)
>>>
>>> async def write_async(data):
>>> write_func(data)
>>>
>>> coro = tail_async(handle, interval, write_async)
>>> loop.run_until_complete(coro)
This new tail_file function is a drop-in replacement for the old one. I can verify that everything works as expected by calling run_threads again:
Click here to view code image
input_paths = ... handles = ... output_path = ... run_threads(handles, 0.1, output_path)
confirm_merge(input_paths, output_path)
After wrapping tail_async with tail_file, the next step is to convert the run_threads function to a coroutine. This ends up being the same work as step 4 of the top-down approach above, so at this point, the styles converge.
This is a great start for adopting asyncio, but there’s even more that you could do to increase the responsiveness of your program (see Item 63: “Avoid Blocking the asyncio Event Loop to Maximize Responsiveness”).
7.11.1. Things to Remember¶
✦ The awaitable run_in_executor method of the asyncio event loop enables coroutines to run synchronous functions in ThreadPoolExecutor pools. This facilitates top-down migrations to asyncio.
✦ The run_until_complete method of the asyncio event loop enables synchronous code to run a coroutine until it finishes. The asyncio.run_coroutine_threadsafe function provides the same functionality across thread boundaries. Together these help with bottom-up migrations to asyncio.