How Python Asyncio Works: Recreating it from Scratch
Posted on
Right now, asyncio is one of the trendier topics in Python, and rightfully so – It’s a great way to handle I/O-bound programs! When I was learning about asyncio, It took me a while to understand how it actually worked. But later, I came to find out that it’s basically just a really nice layer on top of Python Generators. In this article, I’m going to create a simplified version of asyncio using just Python generators. Then, I’m going to refactor the example to use the async
and await
keywords with the help of the __await__
dunder method before coming full circle and swapping out my version for the real asyncio. By building a simple version of asyncio, hopefully, by the end of this article, you’ll be able to get a better grasp of how it does its magic!
Generators Review
If you're already familiar with generators skip this part, but if you aren't, it's what asyncio is built off of, so it's very important to understand how they work.
First of all, the reason why generators are a thing is because they allow you to make your code more memory efficient. Imagine if you had the following loop:
for i in range(100_000_000):
print(i)
If range
wasn't a generator, but rather was a function that returned a list for you to then loop over, code like the example above would be very memory inefficient since you would be creating a list with 100 million elements… However, because range
is a generator, at least in Python 3+, you only generate the numbers as they are needed, one by one, without ever storing the entire sequence in memory.
There are a few ways to create generators, but we're going to focus on generator functions. These generators are defined like any other function but use the yield
statement to return data. This statement turns a regular function into a generator that, instead of executing all at once, can pause and resume its state when calling next(iterator)
.
Take the following generator function, for example:
def generator():
yield 'hello'
yield 'world'
iterator = generator()
When you call the generator, instead of running the code inside of the function like Python normally would, it sees the yield
keyword and, therefore, returns a generator object. Once we have the generator object, we can then call next(iterator)
, which will run the function’s code up until the first/next yield
statement:
print(next(iterator)) # Output: hello
print(next(iterator)) # Output: world
If we try calling next(iterator)
again, the generator will raise a StopIteration
exception because there are no more yield statements in the generator function.
Another cool feature of Python generators is yield from
, which lets a generator call a sub-generator or an iterable object, enabling you to create a chain of generators!
def generator():
yield 'hello'
def another_generator():
yield from generator()
iterable = another_generator()
print(next(iterable)) # Output: hello
There’s a bit more to generators than I’m talking about, such as generator comprehensions, which are like list comprehensions but created with parentheses instead of brackets, and the ability to send data to generators with iterator.send(value)
. However, for this article, the important thing to remember about generators is that they allow a function to be started and stopped while maintaining its state!
The Event Loop
The event loop, which is in charge of running and managing all of the current tasks, is the core of asyncio and is the first thing we’ll recreate with generators. While the asyncio event loop is written in C, an easy way to think about it is as a list that holds all of the current tasks. For now, think of these tasks as just generator objects. The event loop manager will loop through each task in the list and use the next(task)
function to run each of them. That task will then run, and when it's doing I/O-bound work like sleeping, it’ll use the yield
keyword to pause its execution and give control back to this event loop, which will then go on to the next task in the loop.
Here’s an example of that — we have two tasks that both print their task number and then yield, which stops their execution. Since the event loop manager is the one that calls next()
, after the task yields, it gets back control and then goes to run the next task in the loop.
def task1():
while True:
print('Task 1')
yield
def task2():
while True:
print('Task 2')
yield
event_loop = [task1(), task2()]
while True:
for task in event_loop:
next(task)
Subsequently, the output of this code is going to look like the following and will go on forever since both of the generator functions never finish due to the while True
loops.
Task 1
Task 2
Task 1
Task 2
…
Sleeping
If we take the same code from above, we can add sub-generators to our tasks via the use of yield from
. Below, I added a sleeping generator which will pause the execution of the tasks until the specified time is up. This works because sleep
will keep yielding until a certain amount of seconds has passed, at which point it will leave the while
loop. Since there are no more yield
statements in sleep
, a StopIteration
exception is raised which signals to the yield from
in the task functions to continue to the next line of code.
import time
def sleep(seconds):
start_time = time.time()
while time.time() - start_time < seconds:
yield
def task1():
while True:
print('Task 1')
yield from sleep(1)
def task2():
while True:
print('Task 2')
yield from sleep(5)
event_loop = [task1(), task2()]
while True:
for task in event_loop:
next(task)
Output:
Task 1
Task 2
Task 1
Task 1
Task 1
Task 1
Task 2
Task 1
…
Yield to Await
We can now take the above code, and transition from using yield
to await
by using the __await__
dunder method and the async
keyword. When a class has the __await__
method, we can use the await
keyword in front of an instance of the class to call it. In asyncio, you are generally working with Task
objects via a function like asyncio.create_task
. These Task
objects inherit from the asyncio Future
object, which has the __await__
method. We can also use await
in front of a coroutine, which is an object created when calling a function with the async
keyword in front of it. Coroutines are similar to generator functions in the sense that a coroutine’s execution can also be paused and resumed.
You can think of the await
keyword as just a synonym for yield from
with some extra validation rules. So, when writing the code await object
, you are basically saying to either yield from the __await__
method in the “object” class instance OR “object” could be another coroutine (like a sub-generator).
You can actually look in the Asyncio source code and see that the __await__
method inside of the Future
object basically just calls yield
if the future (or task) is not done completing:
To move the code that we wrote in the above section to using async
and await
, we first need to make our own Task
class since a function can’t have the __await__
dunder method. Below is a simple version that I came up with:
from queue import Queue
event_loop = Queue()
class Task():
def __init__(self, generator):
self.iter = generator
self.finished = False
def done(self):
return self.finished
def __await__(self):
while not self.finished:
yield self
def create_task(generator):
task = Task(generator)
event_loop.put(task)
return task
This time, instead of using a Python list to create the event loop, we’re using a queue, which makes a bit more sense since we want to be able to add and delete tasks from the loop in constant time.
For our Task
class, we store the generator object in self.iter
and also set self.finished
to False
, which will keep track of if the generator is done running (It’s done running when it raises StopIteration
). Our Task
object also has an __await__
dunder method, which will just keep yielding control back to the event loop until the task is finished. Lastly, after creating the Task
object with the create_task
helper function, we add it to the event loop, which schedules it to be run.
Now, let’s build the event loop manager, which will run the tasks:
def run(main):
event_loop.put(Task(main))
while not event_loop.empty():
task = event_loop.get()
try:
task.iter.send(None)
except StopIteration:
task.finished = True
else:
event_loop.put(task)
You may notice that this is starting to mimic the actual asyncio API since to start the event loop, we need to call run
with an initial function. The function first wraps the main function in a Task
object and adds it to the event loop. The while
loop will then run, and for each cycle, will get the next task to run via the queue. Instead of using next(task.iter)
, we now need to use task.iter.send(None)
, which is just a weird quirk of working with the async/await keywords, but it does the same thing. We also want to wrap this call in a try-except block since if a StopIteration
exception is thrown, we can set task.finished
to True
, but if no exceptions are raised, the code will go to the else
statement which adds the task back to the event loop to be run again.
Next, we need to make the sleep function async-compliant. Before, we were using a generator function with a while-loop and one yield
to handle the sleeping. I like this approach, but you can’t use the await
keyword in combination with a generator function - it needs to be an object with the __await__
dunder method or a coroutine function. So, to solve this, I moved the code to another function, and now the actual sleep
function creates a task object and then awaits it. This await
will call the __await__
method inside of the Task object, which will then yield, letting the event loop move to another task. When the event loop gets to the new _sleep
task, it will check the time, and if not enough has passed, will also call yield
to give control back to the event loop. If the task that is sleeping is called via the event loop again, like how a generator stores its state, the coroutine would still be waiting for sleep
to return, and since sleep
would still be awaiting the _sleep
task to finish, the __await__
dunder method of the task will be called again and since the task isn't finished, the yield in the dunder method will be called.
import time
def _sleep(seconds):
start_time = time.time()
while time.time() - start_time < seconds:
yield
async def sleep(seconds):
task = create_task(_sleep(seconds))
return await task
Here’s all of the code put together:
from queue import Queue
import time
event_loop = Queue()
def _sleep(seconds):
start_time = time.time()
while time.time() - start_time < seconds:
yield
async def sleep(seconds):
task = create_task(_sleep(seconds))
return await task
class Task():
def __init__(self, generator):
self.iter = generator
self.finished = False
def done(self):
return self.finished
def __await__(self):
while not self.finished:
yield self
def create_task(generator):
task = Task(generator)
event_loop.put(task)
return task
def run(main):
event_loop.put(Task(main))
while not event_loop.empty():
task = event_loop.get()
try:
task.iter.send(None)
except StopIteration:
task.finished = True
else:
event_loop.put(task)
Now that we’ve built the event loop, a way to create tasks, and a sleep function, we can import the file (called “jacobio.py”) and take the code from back when we were using yields and replace all of the yield from
statements with await
, add async
to the functions with the await
keyword to signify that those functions can be awaited, and then create a main function, like you would in asyncio, to add the tasks to the event loop:
import jacobio
async def task1():
for _ in range(2):
print('Task 1')
await jacobio.sleep(1)
async def task2():
for _ in range(3):
print('Task 2')
await jacobio.sleep(0)
async def main():
one = jacobio.create_task(task1())
two = jacobio.create_task(task2())
await one
await two
print('done')
if __name__ == '__main__':
jacobio.run(main())
Output:
Task 1
Task 2
Task 2
Task 2
Task 1
done
Await with AsyncIO
We can now take our code from above and replace all occurrences of “jacobio” with “asyncio” and we’re now fully using the asyncio package!
import asyncio
async def task1():
for _ in range(2):
print('Task 1')
await asyncio.sleep(1)
async def task2():
for _ in range(3):
print('Task 2')
await asyncio.sleep(0)
async def main():
one = asyncio.create_task(task1())
two = asyncio.create_task(task2())
await one
await two
print('done')
if __name__ == '__main__':
asyncio.run(main())
Asyncio does a lot more behind the scenes, but we were able to go from basic generators to recreating the core parts of asyncio from scratch! I tried to make the event loop manager as simple as possible, and while this is the basic idea behind asyncio, given the scale and complexity of the actual package, my implementation is slightly different from the flow of the actual source code. Also, now that we have the full power of the real asyncio package, we don’t need to create two tasks just to await both; instead, we can use a function such as asyncio.gather()
to handle multiple tasks. If you’re curious about all of the ways to manage tasks in asyncio, check out my article on handling asyncio tasks like a pro!