How to Use Asyncio with Existing Blocking Library

How to use asyncio with existing blocking library?

There are (sort of) two questions here:

  1. how can I run blocking code asynchronously within a coroutine
  2. how can I run multiple async tasks at the "same" time (as an aside: asyncio is single-threaded, so it is concurrent, but not truly parallel).

Concurrent tasks can be created using the high-level asyncio.create_task or the low-level asyncio.ensure_future. Starting with 3.11, they can also be created through asyncio task groups, as pioneered by the Trio library (the creator of Trio has an excellent blog post on the subject here).

To run synchronous code, you will need to run the blocking code in an executor. Example:

import concurrent.futures
import asyncio
import time

def blocking(delay):
time.sleep(delay)
print('Completed.')

async def non_blocking(executor):
loop = asyncio.get_running_loop()
# Run three of the blocking tasks concurrently. asyncio.wait will
# automatically wrap these in Tasks. If you want explicit access
# to the tasks themselves, use asyncio.ensure_future, or add a
# "done, pending = asyncio.wait..." assignment
await asyncio.wait(
fs={
# Returns after delay=12 seconds
loop.run_in_executor(executor, blocking, 12),

# Returns after delay=14 seconds
loop.run_in_executor(executor, blocking, 14),

# Returns after delay=16 seconds
loop.run_in_executor(executor, blocking, 16)
},
return_when=asyncio.ALL_COMPLETED
)

executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
asyncio.run(non_blocking(executor))

If you want to schedule these tasks using a for loop (as in your example), you have several different strategies, but the underlying approach is to schedule the tasks using the for loop (or list comprehension, etc), await them with asyncio.wait, and then retrieve the results. Example:

done, pending = await asyncio.wait(
fs=[loop.run_in_executor(executor, blocking_foo, *args) for args in inps],
return_when=asyncio.ALL_COMPLETED
)

# Note that any errors raise during the above will be raised here; to
# handle errors you will need to call task.exception() and check if it
# is not None before calling task.result()
results = [task.result() for task in done]

How to measure time spent in blocking code while using asyncio in Python?

TLDR;

This decorator does the job:

def measure_blocking_code(f):
async def wrapper(*args, **kwargs):
t = 0
coro = f()
try:
while True:
t0 = time.perf_counter()
future = coro.send(None)
t1 = time.perf_counter()
t += t1 - t0
while not future.done():
await asyncio.sleep(0)
future.result() # raises exceptions if any
except StopIteration as e:
print(f'Function took {t:.2e} sec')
return e.value
return wrapper

Explanation

This workaround exploits the conventions used in asyncio implementation in cPython. These conventions are a superset of PEP-492. In other words:

  1. You can generally use async/await without knowing these details.
  2. This might not work with other async libraries like trio.

An asyncio coro object (coro) can be executed by calling .send() member. This will only run the blocking code, until an async call yields a Future object. By only measuring the time spent in .send(), the duration of the blocking code can be determined.

asyncio: why isn't it non-blocking by default

asyncio is asynchronous because coroutines cooperate voluntarily. All asyncio code must be written with cooperation in mind, that's the point entirely. Otherwise you may as well use threading exclusively to achieve concurrency.

You can't run 'blocking' functions (non-coroutine functions or methods that won't cooperate) in an executor because you can't just assume that that code can be run in a separate executor thread. Or even if it needs to be run in an executor.

The Python standard library is full of really useful code, that asyncio projects will want to make use of. The majority of the standard library consists of regular, 'blocking' function and class definitions. They do their work quickly, so even though they 'block', they return in reasonable time.

But most of that code is also not thread-safe, it doesn't need to be usually. But as soon as asyncio would run all such code in an executor automatically, then you can't use non-thread-safe functions any more. Besides, creating a thread to run synchronous code in is not free, creating the thread object costs time, and your OS won't let you run an infinite number of threads either. Loads of standard library functions and methods are fast, why would you want to run str.splitlines() or urllib.parse.quote() in a separate thread when it would be much quicker to just execute the code and be done with it?

You may say that those functions are not blocking by your standards. You didn't define 'blocking' here, but 'blocking' just means: won't voluntarily yield.. If we narrow this down to won't voluntarily yield when it has to wait for something and the computer could be doing something else instead, then the next question would be how would you detect that it should have yielded?

The answer to that is that you can't. time.sleep() is a blocking function where you'd want to yield to the loop for, but that's a C function call. Python can't know that time.sleep() is going to block for longer, because a function that calls time.sleep() will look up the name time in the global namespace, and then the attribute sleep on the result of the name lookup, only when actually executing the time.sleep() expression. Because Python's namespaces can be altered at any point during execution, you can't know what time.sleep() will do until you actually execute the function.

You could say that the time.sleep() implementation should automatically yield when called then, but then you'd have to start identifying all such functions. And there is no limit to the number of places you'd have to patch and you can't ever know all the places. Certainly not for third-party libraries. For example the python-adb project gives you a synchronous USB connection to an Android device, using the libusb1 library. That's not a standard I/O codepath, so how would Python know that creating and using those connections are good places to yield?

So you can't just assume that code needs to be run in an executor, not all code can be run in an executor because it is not thread-safe, and Python can't detect when code is blocking and should really be yielding.

So how do coroutines under asyncio cooperate? By using task objects per logical piece of code that needs to run concurrently with other tasks, and by using future objects to signal to the task that the current logical piece of code wants to cede control to other tasks. That's what makes asynchronous asyncio code asynchronous, voluntarily ceding control. When the loop gives control to one task out of many, the task executes a single 'step' of the coroutine call chain, until that call chain produces a future object, at which point the task adds a wakeup callback to the future object 'done' callback list and returns control to the loop. At some point later, when the future is marked done, the wakeup callback is run and the task will execute another coroutine callchain step.

Something else is responsible for marking the future objects as done. When you use asyncio.sleep(), a callback to be run at a specific time is given to the loop, where that callback would mark the asyncio.sleep() future as done. When you use a stream object to perform I/O, then (on UNIX), the loop uses select calls to detect when it is time to wake up a future object when the I/O operation is done. And when you use a lock or other synchronisation primitive, then the synchronisation primitive will maintain a pile of futures to mark as 'done' when appropriate (Waiting for a lock? add a future to the pile. Freeing a held lock? Pick the next future from the pile and mark it as done, so the next task that was waiting for the lock can wake up and acquire the lock, etc.).

Putting synchronous code that blocks into an executor is just another form of cooperation here. When using asyncio in a project, it is up to the developer to make sure that you use the tools given to you to make sure your coroutines cooperate. You are free to use blocking open() calls on files instead of using streams, and you are free to use an executor when you know the code needs to be run in a separate thread to avoid blocking too long.

Last but not least, the whole point of using asyncio is to avoid using threading as much as possible. Using threads has downsides; code needs to be thread-safe (control can switch between threads anywhere, so two threads accessing a shared piece of data should do so with care, and 'taking care' can mean that the code is slowed down). Threads execute no matter if they have anything to do or not; switching control between a fixed number of threads that all wait for I/O to happen is a waste of CPU time, where the asyncio loop is free to find a task that is not waiting.

Can asyncio code safely call native libraries that use pthread?

Inside asyncio's coroutine you can use any function if it runs relatively fast (< 0.05 sec.), otherwise you risk to block event loop and through this get significant performance degradation.

If function runs relatively slow it still can be used inside asycnio's coroutine without side-effects, but only inside executor.

If function returns fast and later calls callback - it's a good situation. Such function/callback can be cast to nice asyncio's awaitable using asyncio.Future.

Note also that many asyncio's objects are not thread-safe by default.


Long story short, I don't see reason why asyncio can't be safely used with third-party library that uses threads if everything is implemented right.

But before you rewrite something in large code base you should know for certain why you need asyncio and, IMHO, have some experience with it on lesser code base.

Try to take single callback-based function and cast it to coroutine with asyncio.Future. Try to execute multiple such coroutines simultaneously. See if you achieve what you want and if everything is going smoothly. Keep going this way.

Wrapping existing code to use asyncio instead of rewriting sounds like a good idea: you can do it iteratively and only for parts you use.

Run slow background blocking task from asyncio loop

I believe that since you are setting your BoundedSemaphore to 1 it is only allowing one instance of your task to run at a time.

You can use the ratelimiter package to limit the number of concurrent requests in a certain amount of time.



Related Topics



Leave a reply



Submit