Asynchronous I/O in Python
Introduction
Most programs would be I/O bound, and I/O is slow in respect to regular code execution. While we wait for these I/O operations, our program remains idle, wasting millions of CPU cycles. So to make more efficient use of these CPU cycles, we can program to do other tasks while it awaits the I/O operation.
Why Asyncio?
Global Interpreter Lock (GIL) in python allows only one thread to execute at any given point in time. Asyncio basically allows for cooperative multitasking, where multiple tasks (coroutines) communicate with each other to take turns in execution for optimal usage of time.
You need asyncio, if your program has blocking functions. These are functions that prevent further execution of code, until that line of code is completed, like an HTTP request.
Sample Scenario
Let's take a site that has 5 temperature sensors and we have this program that takes readings every 5 seconds and calculates and prints their average. We need to call each sensor and it returns the current temperature. Ideally, the code would be as follows
Although this would work, we see that each sensor is called one after the other, and in an environment with dozens of such sensors, this code would take a long time to complete. Most of the time would be spent just awaiting an HTTP response.
Here, we can use asyncio, each sensor read would be a coroutine and can be run concurrently. Let's also assume that the temperature reading is in Fahrenheit and we need to convert it to Celsius.
So an asynchronous sensor read call function would be as follows
We use aiohttp here instead of request as it's not asynchronous. Also, we add the await keyword, so the program can execute other coroutines when it awaits a sensor response.
Aiohttp
It is a client/server-side python library, that allows making asynchronous requests made available from Python 3.5. It can be installed using
pip install aiohttp
To fetch using aiohttp we need to create a session,
async with aiohttp.ClientSession() as session:
and this session can be shared with multiple coroutines, usually, a session is made per application. Using this session we can fetch a URL by using session.get
async with session.get(url) as resp:
other HTTP methods like POST, PUT are also available
async with session.post(url, data=b'data') as resp:
async with session.put(url, data=b'data')as resp:
async with session.delete(url) as resp:
async with session.head(url) as resp:
async with session.options(url) as resp:
async with session.patch(url, data=b'data') as resp:
Connectors
Connectors are transports for aiohttp, it can be either TCP or Unix. We can use the connector to disable SSL verification
app_connector = aiohttp.TCPConnector(ssl=False)
async with aiohttp.ClientSession(connector=app_connector) as session:
and to limit the no of total simultaneous requests using limit or limit total simultaneous requests to the same endpoint using limit_per_host.
app_connector = aiohttp.TCPConnector(ssl=False, limit=100, limit_per_host=25)
async with aiohttp.ClientSession(connector=app_connector) as session:
Authentication
we can use add login credentials using the BasicAuth parameter
app_connector = aiohttp.TCPConnector(ssl=False)
app_auth = aiohttp.BasicAuth(login=username, password=password, encoding='utf-8')
async with aiohttp.ClientSession(connector=app_connector, auth=app_auth) as session:
Headers
We can add headers as follows
app_connector = aiohttp.TCPConnector(ssl=False)
app_header = {'Authorization': app_api_key}
async with aiohttp.ClientSession(connector=app_connector, headers=app_header) as session:
To summarize a final code using the discussed parameters would look like this.
If you notice, we have created the session within the main function and passed it to the coroutines for executions, using limit and limit_per_host, we do not need any additional semaphores to control the number of concurrencies. The request will simply wait when the maximum concurrency limit is exceeded.
Asyncio
Await
We need to use await before all coroutines, in the case below result would give the coroutine id, and result1 would give the value 4.
async def foo(n1):
return n1*2
result = foo(2)
print(result)
result1 = await foo(2)
print(result1)
Asyncio Tasks
Another addition to the above program is tasks. We are using asyncio.create_task function to add to tasks list, which is then later gathered.
Here we create a list of tasks, to which each sensor read task is appended.
Asyncio Gather
await asyncio.gather(*Tasks)
The code above will unpack all tasks in the Tasks list and run them concurrently. By default, it will ignore any exceptions in coroutines and return the result as a list. We can however enable raised exceptions by using
await asyncio.gather(*Tasks, return_Exceptions=True)
Asyncio Run
asyncio.run(foo())
the above code does two functions, gets the event loop, and runs the program until it completes. Alternatively, we can execute it as follows
loop = asyncio.get_event_loop()
loop.run_until_complete(foo())
Aiofiles
Similarly, for asynchronously reading files, we can use aiofiles. It can be installed by
pip install aiofiles
Let's take a sample scenario where we need to read data from a file
async with aiofiles.open('filename', mode='r') as f:
contents = await f.read()
print(contents)
However if this file contains say millions of lines of data, that needs to be processed we can use an iterator
async with aiofiles.open('filename') as f:
async for line in f:
print(line)
However, if the use case requires us to read the file linearly, we can read by defining chunk_size
from aiofile import AIOFile, Reader
reader = Reader(afp, chunk_size=8)
async with AIOFile('filename') as f:
async for chunk in reader:
print(chunk)
Conclusion
Asyncio can drastically reduce the execution time of a program, but often this is constrained by the server-side. We need to have a server that allows multiple connections per session and also multiple sessions per user. Asyncio avoids a lot of issues we face if we were to use threading. Also using generators is strongly recommended when possible, as it lowers the memory requirement.