How to Limit Concurrency with Python Asyncio

How to limit concurrency with Python asyncio?

Before reading the rest of this answer, please note that the idiomatic way of limiting the number of parallel tasks this with asyncio is using asyncio.Semaphore, as shown in Mikhail's answer and elegantly abstracted in Andrei's answer. This answer contains working, but a bit more complicated ways of achieving the same. I am leaving the answer because in some cases this approach can have advantages over a semaphore, specifically when the work to be done is very large or unbounded, and you cannot create all the coroutines in advance. In that case the second (queue-based) solution is this answer is what you want. But in most regular situations, such as parallel download through aiohttp, you should use a semaphore instead.


You basically need a fixed-size pool of download tasks. asyncio doesn't come with a pre-made task pool, but it is easy to create one: simply keep a set of tasks and don't allow it to grow past the limit. Although the question states your reluctance to go down that route, the code ends up much more elegant:

import asyncio, random

async def download(code):
wait_time = random.randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))

async def main(loop):
no_concurrent = 3
dltasks = set()
i = 0
while i < 9:
if len(dltasks) >= no_concurrent:
# Wait for some download to finish before adding a new one
_done, dltasks = await asyncio.wait(
dltasks, return_when=asyncio.FIRST_COMPLETED)
dltasks.add(loop.create_task(download(i)))
i += 1
# Wait for the remaining downloads to finish
await asyncio.wait(dltasks)

An alternative is to create a fixed number of coroutines doing the downloading, much like a fixed-size thread pool, and feed them work using an asyncio.Queue. This removes the need to manually limit the number of downloads, which will be automatically limited by the number of coroutines invoking download():

# download() defined as above

async def download_worker(q):
while True:
code = await q.get()
await download(code)
q.task_done()

async def main(loop):
q = asyncio.Queue()
workers = [loop.create_task(download_worker(q)) for _ in range(3)]
i = 0
while i < 9:
await q.put(i)
i += 1
await q.join() # wait for all tasks to be processed
for worker in workers:
worker.cancel()
await asyncio.gather(*workers, return_exceptions=True)

As for your other question, the obvious choice would be aiohttp.

Limiting number of concurrent AsyncIO tasks using Semaphore not working

What is wrong is this line:

current_task = asyncio.create_task(get_product_information(product_specific_url))

When you create a "task" it is imediatelly scheduled for execution. As soons
as your code yield execution to the asyncio loop (at any "await" expression), asyncio will loop executing all your tasks.

The semaphore, in the original snippet you pointed too, guarded the creation of the tasks itself, ensuring only "n" tasks would be active at a time. What is passed in to gather_with_concurrency in that snippet are co-routines.

Co-routines, unlike tasks, are objects that are ready to be awaited, but are not yet scheduled. They canbe passed around for free, just like any other object - they will only be executed when they are either awaited, or wrapped by a task (and then when the code passes control to the asyncio loop).

In your code, you are creating the co-routine, with the get_product_information call, and immediately wrapping it in a task. In the await instruction in the line that calls gather_with_concurrency itself, they are all run at once.

The fix is simple: do not create a task at this point, just inside the code guarded by your semaphore. Add just the raw co-routines to your list:

...
all_coroutines = []
# check all products in the current page
all_products_in_current_page = open_list(os.path.join(base_folder, gender, category, current_page))
for product_specific_url in all_products_in_current_page:
current_coroutine = get_product_information(product_specific_url)

all_coroutines.append(current_coroutine)

await gather_with_concurrency(random.randrange(8, 15), *all_coroutines)

There is still an unrelated incorrectness in this code that will make concurrency fail: you are making a synchronous call to time.sleepinside gather_product_information. This will stall the asyncio loop at this point
until the sleep is over. The correct thing to do is to use await asyncio.sleep(...) .

What is the default concurrency level with asyncio in Python?

There is no built-in limit in asyncio, but there is one in aiohttp. The TCPConnector limits the number of connections to 100 by default. You can override it by creating a TCPConnector with a different limit and passing it to the session.

Set max concurrency with asyncio.create_subprocess_exec

As suggested by @AndrewSvetlov, you can use an asyncio.Semaphore to enforce the limit:

async def run_program(input):
p = await asyncio.create_subprocess_exec(...)
# ... communicate with the process ...
p.terminate()
return something_useful

async def run_throttled(input, sem):
async with sem:
result = await run_program(input)
return result

LIMIT = 10

async def many_programs(inputs):
sem = asyncio.Semaphore(LIMIT)
results = await asyncio.gather(
*[run_throttled(input, sem) for input in inputs])
# ...

Best way to limit concurrent http requests in python (no threads)?

Here I've Implemented a pool using basic asyncio functions.

WORKING:

  • pool starts with maxsize tasks
  • when first task completes, it adds next task to queue and prints its result
  • similarly for each single task completes, it adds another tasks until maxsize

Code:

import asyncio

async def pool(tasks, maxsize=3):
pending = [tasks.pop(0) for _ in range(maxsize) if tasks]
while pending:
(done, pending) = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
while True:
if (not tasks) or (len(pending) >= maxsize):
break
pending.add(tasks.pop(0))
for task in done:
print(task.result())
print("POOL COMPLETED")

For an example you can create tasks and pool like here:

async def work(index, sleep_time):
await asyncio.sleep(sleep_time)
return f"task {index} done"

tasks = [work(i, 1) for i in range(10)]

Now to run the task call the asyncio.run

asyncio.run(pool(tasks, 3))

This will only run 3 tasks in parallel

Controlling the concurrency of HTTP requests using Python's asyncio.Semaphore

The problem is that the Semaphore created at top-level caches the event loop active during its creation (an event loop automatically created by asyncio and returned by get_event_loop() at startup). asyncio.run() on the other hand creates a fresh event loop on each run. As a result you're trying to await a semaphore from a different event loop, which fails. As always, hiding the exception without understanding its cause only leads to further issues down the line.

To fix the issue properly, you should create the semaphore while inside asyncio.run(). For example, the simplest fix can look like this:

# ...
sem = None

async def main():
global sem
sem = asyncio.Semaphore(3)
# ...

A more elegant approach is to completely remove sem from top-level and explicitly pass it to safe_download:

async def safe_download(i, limit):
async with limit:
return await download(i)

async def main():
# limit parallel downloads to 3 at most
limit = asyncio.Semaphore(3)
# you don't need to explicitly call create_task() if you call
# `gather()` because `gather()` will do it for you
await asyncio.gather(*[safe_download(i, limit) for i in range(9)])


Related Topics



Leave a reply



Submit