7.12. Avoid Blocking the asyncio Event Loop to Maximize Responsiveness¶
In the previous item I showed how to migrate to asyncio incrementally (see Item 62: “Mix Threads and Coroutines to Ease the Transition to asyncio” for background and the implementation of various functions below). The resulting coroutine properly tails input files and merges them into a single output:
Click here to view code image
>>> import asyncio
>>>
>>> async def run_tasks(handles, interval, output_path):
>>> with open(output_path, 'wb') as output:
>>> async def write_async(data):
>>> output.write(data)
>>>
>>> tasks = []
>>> for handle in handles:
>>> coro = tail_async(handle, interval, write_async)
>>> task = asyncio.create_task(coro)
>>> tasks.append(task)
>>>
>>> await asyncio.gather(*tasks)
However, it still has one big problem: The open, close, and write calls for the output file handle happen in the main event loop. These operations all require making system calls to the program’s host operating system, which may block the event loop for significant amounts of time and prevent other coroutines from making progress. This could hurt overall responsiveness and increase latency, especially for programs such as highly concurrent servers.
I can detect when this problem happens by passing the debug=True parameter to the asyncio.run function. Here, I show how the file and line of a bad coroutine, presumably blocked on a slow system call, can be identified:
Click here to view code image
import time
- async def slow_coroutine():
time.sleep(0.5) # Simulating slow I/O
asyncio.run(slow_coroutine(), debug=True)
>>>
Executing <Task finished name='Task-1' coro=<slow_coroutine()
➥done, defined at example.py:29> result=None created
➥at .../asyncio/base_events.py:487> took 0.503 seconds
...
If I want the most responsive program possible, I need to minimize the potential system calls that are made from within the event loop. In this case, I can create a new Thread subclass (see Item 53: “Use Threads for Blocking I/O, Avoid for Parallelism”) that encapsulates everything required to write to the output file using its own event loop:
Click here to view code image
>>> from threading import Thread
>>>
>>> class WriteThread(Thread):
>>> def __init__(self, output_path):
>>> super().__init__()
>>> self.output_path = output_path
>>> self.output = None
>>> self.loop = asyncio.new_event_loop()
>>>
>>> def run(self):
>>> asyncio.set_event_loop(self.loop)
>>> with open(self.output_path, 'wb') as self.output:
>>> self.loop.run_forever()
>>>
>>> # Run one final round of callbacks so the await on
>>> # stop() in another event loop will be resolved.
>>> self.loop.run_until_complete(asyncio.sleep(0))
Coroutines in other threads can directly call and await on the write method of this class, since it’s merely a thread-safe wrapper around the real_write method that actually does the I/O. This eliminates the need for a Lock (see Item 54: “Use Lock to Prevent Data Races in Threads”):
Click here to view code image
>>> async def real_write(self, data):
>>> self.output.write(data)
>>>
>>> async def write(self, data):
>>> coro = self.real_write(data)
>>> future = asyncio.run_coroutine_threadsafe(
>>> coro, self.loop)
>>> await asyncio.wrap_future(future)
Other coroutines can tell the worker thread when to stop in a threadsafe manner, using similar boilerplate:
Click here to view code image
>>> async def real_stop(self):
>>> self.loop.stop()
>>> async def stop(self):
>>> coro = self.real_stop()
>>> future = asyncio.run_coroutine_threadsafe(
>>> coro, self.loop)
>>> await asyncio.wrap_future(future)
I can also define the aenter and aexit methods to allow this class to be used in with statements (see Item 66: “Consider contextlib and with Statements for Reusable try/finally Behavior”). This ensures that the worker thread starts and stops at the right times without slowing down the main event loop thread:
Click here to view code image
>>> async def __aenter__(self):
>>> loop = asyncio.get_event_loop()
>>> await loop.run_in_executor(None, self.start)
>>> return self
>>>
>>> async def __aexit__(self, *_):
>>> await self.stop()
With this new WriteThread class, I can refactor run_tasks into a fully asynchronous version that’s easy to read and completely avoids running slow system calls in the main event loop thread:
Click here to view code image
>>> def readline(handle):
>>> ...
>>>
>>> async def tail_async(handle, interval, write_func):
>>> ...
>>>
>>> async def run_fully_async(handles, interval, output_path):
>>> async with WriteThread(output_path) as output:
>>> tasks = []
>>> for handle in handles:
>>> coro = tail_async(handle, interval, output.write)
>>> task = asyncio.create_task(coro)
>>> tasks.append(task)
>>>
>>> await asyncio.gather(*tasks)
I can verify that this works as expected, given a set of input handles and an output file path:
Click here to view code image
def confirm_merge(input_paths, output_path): … input_paths = … handles = … output_path = … asyncio.run(run_fully_async(handles, 0.1, output_path))
confirm_merge(input_paths, output_path)
7.12.1. Things to Remember¶
✦ Making system calls in coroutines—including blocking I/O and starting threads—can reduce program responsiveness and increase the perception of latency.
✦ Pass the debug=True parameter to asyncio.run in order to detect when certain coroutines are preventing the event loop from reacting quickly.