Handling Tasks in Asyncio Like a Pro
Posted on
Python’s Asyncio module is a great way to handle IO-bound jobs and has seen many improvements in recent Python updates. However, there are so many ways to handle async tasks that figuring out which method to use for different scenarios can be a bit confusing. In this article, I’ll first go over the basics of what a task object is and then talk about all of the different ways to handle them and the pros and cons of each.
What's a Task
Before talking about tasks, it’s important to understand how Asyncio coroutines work since a task object is just a coroutine wrapper that can run asynchronously.
Coroutines
To create a coroutine object, all you need to do is add the async
keyword to the front of a function/method definition. This signifies that the function is able to be stopped and started via the event loop (if the coroutine has its own await
keywords). When calling a coroutine function, instead of the actual function being run, a coroutine object is created. This object then needs to be waited upon with the await
keyword, which will then run the code in the coroutine.
Below is an example of how to create a coroutine and then run the code inside of the coroutine object using await
:
import asyncio
async def my_function():
print(‘Hello World’)
async def main():
coro = my_function()
print(type(coro))
await coro
asyncio.run(main())
In the example, we first call my_function
, which creates the coroutine object, which we can see via the print statement. Then to print “Hello World,” we use the await
keyword to pause the execution of main
and run my_function
. The full output is:
<class ‘coroutine’>
Hello World
Scheduled Coroutines
After making a coroutine, you generally want to wrap it in an asyncio.Task
object. Creating a task will automatically schedule the coroutine to be run by appending the task object to the event loop (basically just a list of task objects). To create a task object, you can use the asyncio.create_task(coro, *, name=None, context=None)
function. This function takes a coroutine object as well as two optional keyword arguments. The first optional argument is name
, which lets you give the task object a name so that you can remember what it does. You can also set the name after creating the task object via Task.set_name(name)
and then get the task name via Task.get_name()
. The other keyword argument, added in 3.11, is context
, which lets you give the task a context variable to implement local storage inside the task (sort of similar to the Threading.local()
object but for async tasks). The name
argument can be pretty useful, but context
is pretty rarely used.
Another important thing to note is that the event loop only stores weak references to the task objects. So, if you just write asyncio.create_task(my_function())
, the garbage collector will destroy the task. Instead, you need to store a non-weak reference to all tasks by storing the task object returned from create_task
in either a variable or other object.
Here’s a basic example of using a task object:
import asyncio
async def my_function():
print(‘Hello World’)
async def main():
task = asyncio.create_task(my_function())
print(type(task))
await task
asyncio.run(main())
Output:
<class ‘_asyncio.Task’>
Hello World
Besides just waiting for the task to finish, you can also cancel it with Task.cancel()
, add a callback function to be called when the task finishes with Task.add_done_callback(cb)
, manually check if the coroutine is done running with Task.done()
, or get the result of the wrapped coroutine when the task is done with Task.result()
; checkout the complete list of Task methods in Python’s documentation.
Here’s the same example from above, but with some of these task methods in use:
import asyncio
async def my_function():
return ‘Hello World!‘
async def main():
task = asyncio.create_task(my_function())
print(task.done()) # Will print False
await task
print(task.done()) # Will print True
print(task.result()) # Will print Hello World!
asyncio.run(main())
While we generally create tasks and then use some method to wait for them to be done, if you want to create a task and then simply forget about it, you can utilize the following pattern. This is straight out of the Asyncio documentation; it creates tasks, adds them to a set so that there is a reference to them, and then when the task is done, it is discarded from the set via a callback.
background_tasks = set()
for _ in range(10):
task = asyncio.create_task(some_coro())
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
Wait for a Single Task
Now that we've discussed coroutines and task objects, we're ready to talk about ways to manage them more elegantly. While the await
keyword is fundamental for pausing the current coroutine until the awaitable (such as a coroutine, task, or future) being waited upon with await
completes, it inherently focuses on one operation at a time. This article is leading towards using Asyncio's built-in functions to aggregate multiple tasks into a single awaitable object and then use await
on that single object.
While most of Asyncio’s functions are for waiting on multiple tasks, there’s one function meant to wait for a single awaitable called wait_for
. Let’s talk about that one first:
asyncio.wait_for
The next step up from a simple await
is the wait_for
function.
asyncio.wait_for(aw, timeout)
This function takes in a single awaitable object (it automatically wraps coroutines in a task object so that it can be run on the event loop) and waits for it to be done. But unlike a simple await
, It also allows you to add a timeout - if the task takes longer than the allotted time to finish, a TimeoutError
will be raised, and the task inside of wait_for
is canceled.
async def slow_function():
await asyncio.sleep(100)
async def main():
try:
await asyncio.wait_for(slow_function(), timeout=5.0)
except TimeoutError:
print(‘Function was too slow :(‘)
asyncio.run(main())
Because the coroutine function tried to sleep for 100 seconds, a TimeoutError was raised since the timeout in wait_for
was only set to 5 seconds:
Function was too slow :(
Wait for Multiple Tasks
Now, let’s get to the interesting stuff - awaiting multiple tasks! There are three main ways to wait for a collection of tasks; each has its pros and cons and can be helpful in different scenarios.
asyncio.wait
Our first option is similar to wait_for
but for a collection (such as a list, tuple, or set) of either task or lower-level future objects.
asyncio.wait(collection_of_tasks, *, timeout=None, return_when=ALL_COMPLETED)
This function returns a tuple containing two sets; the first set is the finished tasks, and the second are the ones that aren’t done yet. Tasks that are finished before the timeout or the return_when
directive go into the finished task set, and ones that aren’t are put in the second set, often called pending
or _
if you don’t plan on using them. However, unlike ‘asyncio.wait`, the unfinished tasks are not canceled when a timeout occurs.
The return_when
argument lets you tell asyncio.wait
to return when one of the following three things happen:
-
FIRST_COMPLETED
returns when the first task finishes or is canceled. -
FIRST_EXCEPTION
returns when one of the tasks causes an exception or they all finish. -
ALL_COMPLETED
is the default and will return when all futures are finished or canceled.
Let’s see this in action:
import asyncio
import random
async def job():
await asyncio.sleep(random.randint(1, 5))
async def main():
tasks = [
asyncio.create_task(job(), name=index)
for index in range(1, 5)
]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f’The first task completed was {done.pop().get_name()}’)
asyncio.run(main())
Output:
The first task completed was 4
asyncio.gather
Next, let’s take a look at asyncio.gather(*aws, return_exceptions=False)
.
Unlike wait_for
, which takes a collection of only tasks or future objects, gather
takes any number of tasks, futures, or even coroutine objects as a bunch of positional arguments. However, any coroutine objects passed into the function are automatically scheduled as task objects so that they can run on the event loop. Once all the tasks are finished, all return values, obtained via Task.result()
, are returned as a single list. One of the nicest things about gather
is that the list being returned holds the tasks in the exact order in which they were passed into the function!
Another nice thing about gather
is that it’s the only one of the three functions that can gracefully return exceptions. If the return_exceptions
keyword argument is set to True
, the returned list will contain any exception caused by the tasks in place of where the task’s result value would have been.
Let’s look at an example of how this works:
import asyncio
import random
async def job(id):
print(f’Starting job {id}’)
await asyncio.sleep(random.randint(1, 3))
print(f’Finished job {id}’)
return id
async def main():
# create a list of worker tasks
coros = [job(i) for i in range(4)]
# gather the results of all worker tasks
results = await asyncio.gather(*coros)
# print the results
print(f’Results: {results}’)
asyncio.run(main())
First, we make a list of coroutine objects and then use the *
symbol to unpack them into positional arguments for the gather
function to accept. Awaiting the object returned by gather
will then start running the tasks until they are eventually all done. Notice how the resulting list of return values matches the order in which the tasks were put into gather
despite the tasks finishing in a random order due to await asyncio.sleep(random.randint(1,3))
.
Starting job 0
Starting job 1
Starting job 2
Starting job 3
Finished job 3
Finished job 0
Finished job 1
Finished job 2
Results: [0, 1, 2, 3]
In the next example, I’m putting two coroutines directly into gather
and are setting return_exceptions
to True
, which gracefully returns the exceptions in the same result list:
import asyncio
async def task1():
raise ValueError()
async def task2():
raise KeyError()
async def main():
results = await asyncio.gather(task1(), task2(), return_exceptions=True)
print(results) # Will print [ValueError(), KeyError()]
asyncio.run(main())
One last feature of asyncio.gather
is that just like how you can cancel an individual task with Task.cancel()
, the object that gather
returns (to then be awaited) has its own cancel()
method which will loop through all of the tasks that it’s managing and cancel all of them.
asyncio.as_completed
The next one is a bit different from the above two; Instead of returning a set or list of the results all at once, as_completed
returns an iterable object that lets you handle results as they finish! It accepts all types of awaitables (generally either coroutines, tasks, and futures) and like many of the other methods, also has a timeout
keyword argument, which will raise a TimeoutError
if the tasks haven’t finished by the specified time.
asyncio.as_completed(aws, *, timeout=None)
The following is an example of how as_completed
works:
import asyncio
async def my_task(id):
return f’I am number {id}’
async def main():
tasks = [my_task(id) for id in range(5)]
for coro in asyncio.as_completed(tasks):
result = await coro
print(result)
asyncio.run(main())
We can see that the output is random, as it just prints out whichever finishes first:
I am number 3
I am number 0
I am number 4
I am number 2
I am number 1
Wait for a Group of Tasks
The newest addition to the Asyncio world in Python 3.11 was asyncio.TaskGroup
, which simplified the process of managing a group of tasks in the form of a context manager. It ensures that if one task within the group fails, all other tasks are canceled, helping to maintain robust error handling within async code.
Consider a scenario where you have two coroutines, each responsible for calling a different API. Once you get the API responses from both coroutines, you want to enter all the data into a database, but if one API fails, you don’t want to insert anything. Well, this would be a perfect use-case for a TaskGroup
since you want both coroutines to finish, but if one fails, you might as well cancel the other one right away.
You can use the tg.create_task()
method to add tasks to a task group. The first time any of the task group tasks fail, all of the other tasks in the group will be canceled. An exception will then bubble up to the coroutine with the task group as an ExceptionGroup
or BaseExceptionGroup
.
Here's an example of how to use a task group:
import asyncio
async def do_something():
return 1
async def do_something_else():
return 2
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(do_something())
task2 = tg.create_task(do_something_else())
print(f’Everything done: {task1.result()}, {task2.result()}’)
asyncio.run(main())
Output:
Everything done: 1, 2
Summary
That’s a lot of ways to handle awaitables, so here’s a summary of what we covered:
-
await
is the most basic form of waiting. You can put this keyword in front of any awaitable object and it will run the code inside of said awaitable; however,await
doesn’t let you directly handle multiple tasks at a time. -
asyncio.wait_for
handles a single awaitable object likeawait
, but also lets you set a timeout to handle long-running tasks. -
asyncio.wait
Is likeasyncio.wait_for
but accepts a collection of either task or future objects. You can specify a timeout and also when you want to return, whether that’s when they are all completed, the first one is completed, or on the first exception. -
asyncio.gather
takes any number of awaitable objects in the form of positional arguments. The nice part ofgather
is that it returns a list in the same order as the positional arguments that were passed in and can also return tasks that encountered exceptions. -
asyncio.as_completed
is an iterable that takes in any awaitable object and lets you handle tasks as they finish instead of all at once. It also has a timeout argument. -
asyncio.TaskGroup
is the newest addition to Python 3.11; It lets you handle a group of tasks and will either return all or none of them depending on whether any of the tasks raise an exception.
Phew – that was a lot of ways to handle tasks. It took me a while to get comfortable with when exactly to use each of these, but if you’ve read this far, you’re one step closer to mastering Python’s Async features!