Asynchronous I/O in Python

Hareesh Pallathoor Balakrishnan
4 min readNov 13, 2021

--

Introduction

Most programs would be I/O bound, and I/O is slow in respect to regular code execution. While we wait for these I/O operations, our program remains idle, wasting millions of CPU cycles. So to make more efficient use of these CPU cycles, we can program to do other tasks while it awaits the I/O operation.

Why Asyncio?

Global Interpreter Lock (GIL) in python allows only one thread to execute at any given point in time. Asyncio basically allows for cooperative multitasking, where multiple tasks (coroutines) communicate with each other to take turns in execution for optimal usage of time.

You need asyncio, if your program has blocking functions. These are functions that prevent further execution of code, until that line of code is completed, like an HTTP request.

Sample Scenario

Let's take a site that has 5 temperature sensors and we have this program that takes readings every 5 seconds and calculates and prints their average. We need to call each sensor and it returns the current temperature. Ideally, the code would be as follows

Although this would work, we see that each sensor is called one after the other, and in an environment with dozens of such sensors, this code would take a long time to complete. Most of the time would be spent just awaiting an HTTP response.

Here, we can use asyncio, each sensor read would be a coroutine and can be run concurrently. Let's also assume that the temperature reading is in Fahrenheit and we need to convert it to Celsius.

So an asynchronous sensor read call function would be as follows

We use aiohttp here instead of request as it's not asynchronous. Also, we add the await keyword, so the program can execute other coroutines when it awaits a sensor response.

Aiohttp

It is a client/server-side python library, that allows making asynchronous requests made available from Python 3.5. It can be installed using

pip install aiohttp

To fetch using aiohttp we need to create a session,

async with aiohttp.ClientSession() as session:

and this session can be shared with multiple coroutines, usually, a session is made per application. Using this session we can fetch a URL by using session.get

async with session.get(url) as resp:

other HTTP methods like POST, PUT are also available

async with session.post(url, data=b'data') as resp:
async with session.put(url, data=b'data')as resp:
async with session.delete(url) as resp:
async with session.head(url) as resp:
async with session.options(url) as resp:
async with session.patch(url, data=b'data') as resp:

Connectors

Connectors are transports for aiohttp, it can be either TCP or Unix. We can use the connector to disable SSL verification

app_connector = aiohttp.TCPConnector(ssl=False)
async with aiohttp.ClientSession(connector=app_connector) as session:

and to limit the no of total simultaneous requests using limit or limit total simultaneous requests to the same endpoint using limit_per_host.

app_connector = aiohttp.TCPConnector(ssl=False, limit=100, limit_per_host=25)
async with aiohttp.ClientSession(connector=app_connector) as session:

Authentication

we can use add login credentials using the BasicAuth parameter

app_connector = aiohttp.TCPConnector(ssl=False)
app_auth = aiohttp.BasicAuth(login=username, password=password, encoding='utf-8')
async with aiohttp.ClientSession(connector=app_connector, auth=app_auth) as session:

Headers

We can add headers as follows

app_connector = aiohttp.TCPConnector(ssl=False)
app_header = {'Authorization': app_api_key}
async with aiohttp.ClientSession(connector=app_connector, headers=app_header) as session:

To summarize a final code using the discussed parameters would look like this.

If you notice, we have created the session within the main function and passed it to the coroutines for executions, using limit and limit_per_host, we do not need any additional semaphores to control the number of concurrencies. The request will simply wait when the maximum concurrency limit is exceeded.

Asyncio

Await

We need to use await before all coroutines, in the case below result would give the coroutine id, and result1 would give the value 4.

async def foo(n1):
return n1*2
result = foo(2)
print(result)
result1 = await foo(2)
print(result1)

Asyncio Tasks

Another addition to the above program is tasks. We are using asyncio.create_task function to add to tasks list, which is then later gathered.

Here we create a list of tasks, to which each sensor read task is appended.

Asyncio Gather

await asyncio.gather(*Tasks)

The code above will unpack all tasks in the Tasks list and run them concurrently. By default, it will ignore any exceptions in coroutines and return the result as a list. We can however enable raised exceptions by using

await asyncio.gather(*Tasks, return_Exceptions=True)

Asyncio Run

asyncio.run(foo())

the above code does two functions, gets the event loop, and runs the program until it completes. Alternatively, we can execute it as follows

loop = asyncio.get_event_loop()
loop.run_until_complete(foo())

Aiofiles

Similarly, for asynchronously reading files, we can use aiofiles. It can be installed by

pip install aiofiles

Let's take a sample scenario where we need to read data from a file

async with aiofiles.open('filename', mode='r') as f:
contents = await f.read()
print(contents)

However if this file contains say millions of lines of data, that needs to be processed we can use an iterator

async with aiofiles.open('filename') as f:
async for line in f:
print(line)

However, if the use case requires us to read the file linearly, we can read by defining chunk_size

from aiofile import AIOFile, Reader
reader = Reader(afp, chunk_size=8)
async with AIOFile('filename') as f:
async for chunk in reader:
print(chunk)

Conclusion

Asyncio can drastically reduce the execution time of a program, but often this is constrained by the server-side. We need to have a server that allows multiple connections per session and also multiple sessions per user. Asyncio avoids a lot of issues we face if we were to use threading. Also using generators is strongly recommended when possible, as it lowers the memory requirement.

--

--

No responses yet