Asyncio.Gather VS Asyncio.Wait

Asyncio.gather vs asyncio.wait

Although similar in general cases ("run and get results for many tasks"), each function has some specific functionality for other cases:

asyncio.gather()

Returns a Future instance, allowing high level grouping of tasks:

import asyncio
from pprint import pprint

import random

async def coro(tag):
print(">", tag)
await asyncio.sleep(random.uniform(1, 3))
print("<", tag)
return tag

loop = asyncio.get_event_loop()

group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])
group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])
group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])

all_groups = asyncio.gather(group1, group2, group3)

results = loop.run_until_complete(all_groups)

loop.close()

pprint(results)

All tasks in a group can be cancelled by calling group2.cancel() or even all_groups.cancel(). See also .gather(..., return_exceptions=True),

asyncio.wait()

Supports waiting to be stopped after the first task is done, or after a specified timeout, allowing lower level precision of operations:

import asyncio
import random

async def coro(tag):
print(">", tag)
await asyncio.sleep(random.uniform(0.5, 5))
print("<", tag)
return tag

loop = asyncio.get_event_loop()

tasks = [coro(i) for i in range(1, 11)]

print("Get first result:")
finished, unfinished = loop.run_until_complete(
asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))

for task in finished:
print(task.result())
print("unfinished:", len(unfinished))

print("Get more results in 2 seconds:")
finished2, unfinished2 = loop.run_until_complete(
asyncio.wait(unfinished, timeout=2))

for task in finished2:
print(task.result())
print("unfinished2:", len(unfinished2))

print("Get all other results:")
finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))

for task in finished3:
print(task.result())

loop.close()


TaskGroup (Python 3.11+)

Update: Python 3.11 introduces TaskGroups which can "automatically" await more than one task without gather() or await():

# Python 3.11+ ONLY!
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(some_coro(...))
task2 = tg.create_task(another_coro(...))
print("Both tasks have completed now.")

asyncio wait until complete task children

If you want to wait for something to happen in asyncio, that's generally a clue you need to use the await command. Calling asyncio.create_task doesn't actually cause a task to run; tasks only run when another task yields control by calling await.

Assuming that you want the st() tasks to run concurrently, something like this would work:

import asyncio

async def st(n):
print("before ", n)
await asyncio.sleep(3)
print("after ", n)

async def tt(n):
tasks = []
for i in range(n):
tasks.append(st(i))

await asyncio.gather(*tasks)

async def main():
print("start tasks ...")
await tt(3)
print("complete all tasks")

asyncio.run(main())

In tt(), we append each new coroutine to the tasks list, and then use asyncio.gather to execute the tasks concurrently.

You'll note that we never need to call asyncio.create_task here: asyncio.gather takes care executing things for us, and in main we're only running a single coroutine so we can just await on it directly.

Running the above code results in output like this:

start tasks ...
before 0
before 1
before 2
after 0
after 1
after 2
complete all tasks

Python: why use AsyncIO if not with asyncio.gather()?

There are many reasons to use asyncio besides gather.

What you are really asking is: are there more ways to create concurrent executions besides gather?

To that the answer is yes.

Yes, gather is one of the simplest and most straightforward examples for creating concurrency with asyncio, but it's not limited to gather.
What gather does is creating a bunch of awaitables (if needed, for example coroutines are wrapped in a task) to wait for and return the result once all the futures are ready (and a bunch of other stuff such as propagating cancellation).

Let's examine just two more examples of ways to achieve concurrency:

  1. as_completed - similarly to gather, you send in a bunch of awaitables, but instead of waiting for all of them to be ready, this method returns you the futures as they become ready, unordered.
  2. Another example is to create tasks yourself, e.g. with event_loop.create_task(). This will allow you to create a task that will run on the event loop, which you can later await. In the meantime (until you await the task) you can continue running other code, and basically achieve concurrency (note the task will not run straightaway, but only when you yield control back to the event loop, and it handles the task).

There are many more ways to achieve concurrency. You can start with these examples (the 2nd one is actually a general way you can use to create lots of different concurrent "topologies" of executions).

You can start by reading https://docs.python.org/3/library/asyncio-task.html

python asyncio.gather vs asyncio.as_completed when IO task followed by CPU-bound task

"I've tried ThreadPoolExecutor, [...] it is my understanding that doing so for CPU-bound tasks is counter-productive." - it is countrproductiv in a sense you won't have two such asks running Python code in parallel, using multiple CPU cores - but otherwise, it will work to free up your asyncio Loop to continue working, if only munching code for one task at a time.

If you can't pickle things to a subprocess, running the CPU bound tasks in a ThreadPoolExecutor is good enough.

Otherwise, just sprinkle you cpu code with some await asyncio.sleep(0) (inside the loops) and run them normally as coroutines: that is enough for a cpu bound task not to lock the asyncio loop.

Difference between `asyncio.wait([asyncio.sleep(5)])` and `asyncio.sleep(5)`

TLDR: Do not use blocking calls such as time.sleep in a coroutine. Use asyncio.sleep to asynchronously pause, or use an event loop executor if blocking code must be run.


Using asyncio.wait([thing]) adds a level of indirection, executing thing in a new Future/Task. While a bare await asyncio.sleep(5) executes the sleep during coro1, the wrapped await asyncio.wait([asyncio.sleep(5)]) executes the sleep after all other currently scheduled coroutines.

async def coro1():
logger.info("coro1 start")
await asyncio.sleep(5) # started immediately
logger.info("coro1 finish")

async def coro1():
logger.info("coro1 start")
await asyncio.wait([ # started immediately
asyncio.sleep(5) # started in new task
])
logger.info("coro1 finish")

Since coro2 uses the blocking time.sleep(10), it disables the event loop and all other coroutines.

async def coro2():
logger.info("coro2 start")
time.sleep(10) # nothing happens for 10 seconds
logger.info("coro2 finish")

This prevents further Futures from being started - including new future from asyncio.wait - and from being resumed - including the bare asyncio.sleep(5). In the former case, that means the asyncio.sleep starts after the time.sleep is done - therefore taking 10 + 5 seconds to complete. In the latter case, that means the asyncio.sleep has already started, it just cannot complete before the 10 seconds are up - therefore taking max(10, 5) seconds to complete.


Consistently use asyncio.sleep to get the desired durations. If blocking code must be executed, have it run via an executor.

async def coro1w():
print("coro1w start", time.asctime())
await asyncio.wait([asyncio.sleep(5)])
print("coro1w finish", time.asctime())

async def coro1b():
print("coro1b start", time.asctime())
await asyncio.sleep(5)
print("coro1b finish", time.asctime())

async def coro2a():
print("coro2a start", time.asctime())
await asyncio.sleep(10) # asynchronous sleep
print("coro2a finish", time.asctime())

async def coro2t():
print("coro2t start", time.asctime())
loop = asyncio.get_running_loop() # threaded sleep
await loop.run_in_executor(None, lambda: time.sleep(10))
print("coro2t finish", time.asctime())

async def main():
await asyncio.gather(coro1w(), coro1b(), coro2a(), coro2t())

asyncio.run(main())

asyncio.gather(*tasks) fails to await only a subset of all tasks

gather doesn't really "run" the awaitables, it just sleeps while the event loop does its thing, and wakes up once the awaitables it received are done. What your code does is:

  1. use asyncio.create_task() to spawn a bunch of awaitables in the background.
  2. use asyncio.gather() to wait in batches until some of them have finished.

The fact that gather() in #2 receives a subset of tasks created in #1 won't prevent the rest of the tasks created in #1 from happily running.

To fix the problem, you must postpone calling create_task() until the latest point. In fact, since gather() calls ensure_future() on its arguments (and ensure_future called with a coroutine object ends up calling create_task), you needn't call create_task() at all. If you remove the call to create_task() from main and just pass the coroutine objects to the BatchWorker (and subsequently to gather), the tasks will be both scheduled and awaited in batches, just as you want them:

async def main():
tasks = [task(name) for name in TASK_NAMES]
worker = BatchWorker(tasks)
await worker.run()

`asyncio.wait` confusion when passed a coroutine

The example code that is given in the documentation is:

coro = foo()
done, pending = await asyncio.wait({coro})

if coro in done:
# This branch will never be run!

The reason that this code gives unexpected results is as follows:

  • coro is a coroutine object.
  • When it is passed to asyncio.wait, it automatically creates a Task object from it (which is different from the coroutine object), like coro_task = create_task(coro) (see create_task).
  • When asyncio.wait is done, it returns two sets:
    • a set of tasks that are already done
    • a set of tasks that are not done yet

So in this case, it will return one set that contains coro_task and one set that is empty.

Note that the original coroutine object coro (which is different from coro_task) is not contained in any of the sets, so checking if it is in the "done" set, is pointless - it will never be contained, even if the corresponding task, coro_task is already done.

The fix is to create the Task object for coro outside of asyncio.wait, which will allow to test if that same object is contained in one or the other of the returned sets, in order to determine if the task is done.

In Python versions starting from 3.8 you will get a deprecation warning if you pass a coroutine to asyncio.wait, and since version 3.11 it will be an error, i.e. you are forced to use the "fixed" code.

Alternative to asyncio.wait?

You can just call it this way as it recommends in the docs here

Example from the docs:

async def foo():
return 42

task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})

So your code would become:

await asyncio.wait([
asyncio.create_task(object.function_inside_class()),
asyncio.create_task(object.function_inside_class2())
])

asyncio - Code is executing synchronously

Using await, by definition, waits for the task main to finish. So your code as-is is no different from the synchronous code you posted above. If you want to run them at the same time (asynchronously), while waiting for the results, you should use asyncio.gather or asyncio.wait instead.

async def async_io():
tasks = []
for i in range(10):
tasks += [main(i)]
await asyncio.gather(*tasks)

If you don't care to wait for all of the main() calls to finish, you can also just use asyncio.create_task(main(i)), which creates a Task object and schedule its execution in the background. In this case, def async_io() doesn't need to be async anymore.



Related Topics



Leave a reply



Submit