7.10. Know How to Port Threaded I/O to asyncio¶
Once you understand the advantage of coroutines (see Item 60: “Achieve Highly Concurrent I/O with Coroutines”), it may seem daunting to port an existing codebase to use them. Luckily, Python’s support for asynchronous execution is well integrated into the language. This makes it straightforward to move code that does threaded, blocking I/O over to coroutines and asynchronous I/O.
For example, say that I have a TCP-based server for playing a game involving guessing a number. The server takes lower and upper parameters that determine the range of numbers to consider. Then, the server returns guesses for integer values in that range as they are requested by the client. Finally, the server collects reports from the client on whether each of those numbers was closer (warmer) or further away (colder) from the client’s secret number.
The most common way to build this type of client/server system is by using blocking I/O and threads (see Item 53: “Use Threads for Blocking I/O, Avoid for Parallelism”). To do this, I need a helper class that can manage sending and receiving of messages. For my purposes, each line sent or received represents a command to be processed:
Click here to view code image
>>> class EOFError(Exception):
>>> pass
>>>
>>> class ConnectionBase:
>>> def __init__(self, connection):
>>> self.connection = connection
>>> self.file = connection.makefile('rb')
>>>
>>> def send(self, command):
>>> line = command + '\n'
>>> data = line.encode()
>>> self.connection.send(data)
>>>
>>> def receive(self):
>>> line = self.file.readline()
>>> if not line:
>>> raise EOFError('Connection closed')
>>> return line[:-1].decode()
The server is implemented as a class that handles one connection at a time and maintains the client’s session state:
Click here to view code image
>>> import random
>>>
>>> WARMER = 'Warmer'
>>> COLDER = 'Colder'
>>> UNSURE = 'Unsure'
>>> CORRECT = 'Correct'
>>>
>>> class UnknownCommandError(Exception):
>>> pass
>>>
>>> class Session(ConnectionBase):
>>> def __init__(self, *args):
>>> super().__init__(*args)
>>> self._clear_state(None, None)
>>>
>>> def _clear_state(self, lower, upper):
>>> self.lower = lower
>>> self.upper = upper
>>> self.secret = None
>>> self.guesses = []
It has one primary method that handles incoming commands from the client and dispatches them to methods as needed. Note that here I’m using an assignment expression (introduced in Python 3.8; see Item 10: “Prevent Repetition with Assignment Expressions”) to keep the code short:
Click here to view code image
>>> def loop(self):
>>> while command := self.receive():
>>> parts = command.split(' ')
>>> if parts[0] == 'PARAMS':
>>> self.set_params(parts)
>>> elif parts[0] == 'NUMBER':
>>> self.send_number()
>>> elif parts[0] == 'REPORT':
>>> self.receive_report(parts)
>>> else:
>>> raise UnknownCommandError(command)
The first command sets the lower and upper bounds for the numbers that the server is trying to guess:
Click here to view code image
>>> def set_params(self, parts):
>>> assert len(parts) == 3
>>> lower = int(parts[1])
>>> upper = int(parts[2])
>>> self._clear_state(lower, upper)
The second command makes a new guess based on the previous state that’s stored in the client’s Session instance. Specifically, this code ensures that the server will never try to guess the same number more than once per parameter assignment:
Click here to view code image
>>> def next_guess(self):
>>> if self.secret is not None:
>>> return self.secret
>>>
>>> while True:
>>> guess = random.randint(self.lower, self.upper)
>>> if guess not in self.guesses:
>>> return guess
>>>
>>> def send_number(self):
>>> guess = self.next_guess()
>>> self.guesses.append(guess)
>>> self.send(format(guess))
The third command receives the decision from the client of whether the guess was warmer or colder, and it updates the Session state accordingly:
Click here to view code image
- def receive_report(self, parts):
assert len(parts) == 2 decision = parts[1]
last = self.guesses[-1]
- if decision == CORRECT:
self.secret = last
print(f'Server: {last} is {decision}')
The client is also implemented using a stateful class:
>>> import contextlib
>>> import math
>>>
>>> class Client(ConnectionBase):
>>> def __init__(self, *args):
>>> super().__init__(*args)
>>> self._clear_state()
>>>
>>> def _clear_state(self):
>>> self.secret = None
>>> self.last_distance = None
The parameters of each guessing game are set using a with statement to ensure that state is correctly managed on the server side (see Item 66: “Consider contextlib and with Statements for Reusable try/finally Behavior” for background and Item 63: “Avoid Blocking the asyncio Event Loop to Maximize Responsiveness” for another example). This method sends the first command to the server:
Click here to view code image
>>> @contextlib.contextmanager
>>> def session(self, lower, upper, secret):
>>> print(f'Guess a number between {lower} and {upper}!'
>>> f' Shhhhh, it\'s {secret}.')
>>> self.secret = secret
>>> self.send(f'PARAMS {lower} {upper}')
>>> try:
>>> yield
>>> finally:
>>> self._clear_state()
>>> self.send('PARAMS 0 -1')
New guesses are requested from the server, using another method that implements the second command:
Click here to view code image
>>> def request_numbers(self, count):
>>> for _ in range(count):
>>> self.send('NUMBER')
>>> data = self.receive()
>>> yield int(data)
>>> if self.last_distance == 0:
>>> return
Whether each guess from the server was warmer or colder than the last is reported using the third command in the final method:
Click here to view code image
>>> def report_outcome(self, number):
>>> new_distance = math.fabs(number - self.secret)
>>> decision = UNSURE
>>>
>>> if new_distance == 0:
>>> decision = CORRECT
>>> elif self.last_distance is None:
>>> pass
>>> elif new_distance < self.last_distance:
>>> decision = WARMER
>>> elif new_distance > self.last_distance:
>>> decision = COLDER
>>>
>>> self.last_distance = new_distance
>>>
>>> self.send(f'REPORT {decision}')
>>> return decision
I can run the server by having one thread listen on a socket and spawn additional threads to handle the new connections:
Click here to view code image
>>> import socket
>>> from threading import Thread
>>>
>>> def handle_connection(connection):
>>> with connection:
>>> session = Session(connection)
>>> try:
>>> session.loop()
>>> except EOFError:
>>> pass
>>>
>>> def run_server(address):
>>> with socket.socket() as listener:
>>> listener.bind(address)
>>> listener.listen()
>>> while True:
>>> connection, _ = listener.accept()
>>> thread = Thread(target=handle_connection,
>>> args=(connection,),
>>> daemon=True)
>>> thread.start()
The client runs in the main thread and returns the results of the guessing game to the caller. This code explicitly exercises a variety of Python language features (for loops, with statements, generators, comprehensions) so that below I can show what it takes to port these over to using coroutines:
Click here to view code image
>>> def run_client(address):
>>> with socket.create_connection(address) as connection:
>>> client = Client(connection)
>>>
>>> with client.session(1, 5, 3):
>>> results = [(x, client.report_outcome(x))
>>> for x in client.request_numbers(5)]
>>>
>>> with client.session(10, 15, 12):
>>> for number in client.request_numbers(5):
>>> outcome = client.report_outcome(number)
>>> results.append((number, outcome))
>>>
>>> return results
Finally, I can glue all of this together and confirm that it works as expected:
Click here to view code image
- def main():
address = ('127.0.0.1', 1234) server_thread = Thread(
target=run_server, args=(address,), daemon=True)
server_thread.start()
results = run_client(address) for number, outcome in results:
print(f'Client: {number} is {outcome}')
main()
>>>
Guess a number between 1 and 5! Shhhhh, it's 3.
Server: 4 is Unsure
Server: 1 is Colder
Server: 5 is Unsure
Server: 3 is Correct
Guess a number between 10 and 15! Shhhhh, it's 12.
Server: 11 is Unsure
Server: 10 is Colder
Server: 12 is Correct
Client: 4 is Unsure
Client: 1 is Colder
Client: 5 is Unsure
Client: 3 is Correct
Client: 11 is Unsure
Client: 10 is Colder
Client: 12 is Correct
How much effort is needed to convert this example to using async, await, and the asyncio built-in module?
First, I need to update my ConnectionBase class to provide coroutines for send and receive instead of blocking I/O methods. I’ve marked each line that’s changed with a # Changed comment to make it clear what the delta is between this new example and the code above:
Click here to view code image
>>> class AsyncConnectionBase:
>>> def __init__(self, reader, writer): # Changed
>>> self.reader = reader # Changed
>>> self.writer = writer # Changed
>>>
>>> async def send(self, command):
>>> line = command + '\n'
>>> data = line.encode()
>>> self.writer.write(data) # Changed
>>> await self.writer.drain() # Changed
>>>
>>> async def receive(self):
>>> line = await self.reader.readline() # Changed
>>> if not line:
>>> raise EOFError('Connection closed')
>>> return line[:-1].decode()
I can create another stateful class to represent the session state for a single connection. The only changes here are the class’s name and inheriting from AsyncConnectionBase instead of ConnectionBase:
Click here to view code image
>>> class AsyncSession(AsyncConnectionBase): # Changed
>>> def __init__(self, *args):
>>> ...
>>>
>>> def _clear_values(self, lower, upper):
>>> ...
The primary entry point for the server’s command processing loop requires only minimal changes to become a coroutine:
Click here to view code image
>>> async def loop(self): # Changed
>>> while command := await self.receive(): # Changed
>>> parts = command.split(' ')
>>> if parts[0] == 'PARAMS':
>>> self.set_params(parts)
>>> elif parts[0] == 'NUMBER':
>>> await self.send_number() # Changed
>>> elif parts[0] == 'REPORT':
>>> self.receive_report(parts)
>>> else:
>>> raise UnknownCommandError(command)
No changes are required for handling the first command:
>>> def set_params(self, parts):
>>> ...
The only change required for the second command is allowing asynchronous I/O to be used when guesses are transmitted to the client:
Click here to view code image
>>> def next_guess(self):
>>> ...
>>>
>>> async def send_number(self): # Changed
>>> guess = self.next_guess()
>>> self.guesses.append(guess)
>>> await self.send(format(guess)) # Changed
No changes are required for processing the third command:
>>> def receive_report(self, parts):
>>> ...
Similarly, the client class needs to be reimplemented to inherit from AsyncConnectionBase:
Click here to view code image
>>> class AsyncClient(AsyncConnectionBase): # Changed
>>> def __init__(self, *args):
>>> ...
>>>
>>> def _clear_state(self):
>>> ...
The first command method for the client requires a few async and await keywords to be added. It also needs to use the asynccontextmanager helper function from the contextlib built-in module:
Click here to view code image
>>> @contextlib.asynccontextmanager # Changed
>>> async def session(self, lower, upper, secret): # Changed
>>> print(f'Guess a number between {lower} and {upper}!'
>>> f' Shhhhh, it\'s {secret}.')
>>> self.secret = secret
>>> await self.send(f'PARAMS {lower} {upper}') # Changed
>>> try:
>>> yield
>>> finally:
>>> self._clear_state()
>>> await self.send('PARAMS 0 -1') # Changed
The second command again only requires the addition of async and await anywhere coroutine behavior is required:
Click here to view code image
>>> async def request_numbers(self, count): # Changed
>>> for _ in range(count):
>>> await self.send('NUMBER') # Changed
>>> data = await self.receive() # Changed
>>> yield int(data)
>>> if self.last_distance == 0:
>>> return
The third command only requires adding one async and one await keyword:
Click here to view code image
>>> async def report_outcome(self, number): # Changed
>>> ...
>>> await self.send(f'REPORT {decision}') # Changed
>>> ...
The code that runs the server needs to be completely reimplemented to use the asyncio built-in module and its start_server function:
Click here to view code image
>>> import asyncio
>>>
>>> async def handle_async_connection(reader, writer):
>>> session = AsyncSession(reader, writer)
>>> try:
>>> await session.loop()
>>> except EOFError:
>>> pass
>>>
>>> async def run_async_server(address):
>>> server = await asyncio.start_server(
>>> handle_async_connection, *address)
>>> async with server:
>>> await server.serve_forever()
The run_client function that initiates the game requires changes on nearly every line. Any code that previously interacted with the blocking socket instances has to be replaced with asyncio versions of similar functionality (which are marked with # New below). All other lines in the function that require interaction with coroutines need to use async and await keywords as appropriate. If you forget to add one of these keywords in a necessary place, an exception will be raised at runtime.
Click here to view code image
>>> async def run_async_client(address):
>>> streams = await asyncio.open_connection(*address) # New
>>> client = AsyncClient(*streams) # New
>>>
>>> async with client.session(1, 5, 3):
>>> results = [(x, await client.report_outcome(x))
>>> async for x in client.request_numbers(5)]
>>>
>>> async with client.session(10, 15, 12):
>>> async for number in client.request_numbers(5):
>>> outcome = await client.report_outcome(number)
>>> results.append((number, outcome))
>>>
>>> _, writer = streams # New
>>> writer.close() # New
>>> await writer.wait_closed() # New
>>>
>>> return results
What’s most interesting about run_async_client is that I didn’t have to restructure any of the substantive parts of interacting with the AsyncClient in order to port this function over to use coroutines. Each of the language features that I needed has a corresponding asynchronous version, which made the migration easy to do.
This won’t always be the case, though. There are currently no asynchronous versions of the next and iter built-in functions (see Item 31: “Be Defensive When Iterating Over Arguments” for background); you have to await on the anext and aiter methods directly. There’s also no asynchronous version of yield from (see Item 33: “Compose Multiple Generators with yield from”), which makes it noisier to compose generators. But given the rapid pace at which async functionality is being added to Python, it’s only a matter of time before these features become available.
Finally, the glue needs to be updated to run this new asynchronous example end-to-end. I use the asyncio.create_task function to enqueue the server for execution on the event loop so that it runs in parallel with the client when the await expression is reached. This is another approach to causing fan-out with different behavior than the asyncio.gather function:
Click here to view code image
- async def main_async():
address = ('127.0.0.1', 4321)
server = run_async_server(address) asyncio.create_task(server)
results = await run_async_client(address) for number, outcome in results:
print(f'Client: {number} is {outcome}')
asyncio.run(main_async())
>>>
Guess a number between 1 and 5! Shhhhh, it's 3.
Server: 5 is Unsure
Server: 4 is Warmer
Server: 2 is Unsure
Server: 1 is Colder
Server: 3 is Correct
Guess a number between 10 and 15! Shhhhh, it's 12.
Server: 14 is Unsure
Server: 10 is Unsure
Server: 15 is Colder
Server: 12 is Correct
Client: 5 is Unsure
Client: 4 is Warmer
Client: 2 is Unsure
Client: 1 is Colder
Client: 3 is Correct
Client: 14 is Unsure
Client: 10 is Unsure
Client: 15 is Colder
Client: 12 is Correct
This works as expected. The coroutine version is easier to follow because all of the interactions with threads have been removed. The asyncio built-in module also provides many helper functions and shortens the amount of socket boilerplate required to write a server like this.
Your use case may be more complex and harder to port for a variety of reasons. The asyncio module has a vast number of I/O, synchronization, and task management features that could make adopting coroutines easier for you (see Item 62: “Mix Threads and Coroutines to Ease the Transition to asyncio” and Item 63: “Avoid Blocking the asyncio Event Loop to Maximize Responsiveness”). Be sure to check out the online documentation for the library (https://docs.python.org/3/library/asyncio.html) to understand its full potential.
7.10.1. Things to Remember¶
✦ Python provides asynchronous versions of for loops, with statements, generators, comprehensions, and library helper functions that can be used as drop-in replacements in coroutines.
✦ The asyncio built-in module makes it straightforward to port existing code that uses threads and blocking I/O over to coroutines and asynchronous I/O.