How to Combine Python Asyncio with Threads

How to combine python asyncio with threads?

It's pretty simple to delegate a method to a thread or sub-process using BaseEventLoop.run_in_executor:

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

def cpu_bound_operation(x):
time.sleep(x) # This is some operation that is CPU-bound

@asyncio.coroutine
def main():
# Run cpu_bound_operation in the ProcessPoolExecutor
# This will make your coroutine block, but won't block
# the event loop; other coroutines can run in meantime.
yield from loop.run_in_executor(p, cpu_bound_operation, 5)

loop = asyncio.get_event_loop()
p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes
loop.run_until_complete(main())

As for whether to use a ProcessPoolExecutor or ThreadPoolExecutor, that's kind of hard to say; pickling a large object will definitely eat some CPU cycles, which initially would make you think ProcessPoolExecutor is the way to go. However, passing your 100MB object to a Process in the pool would require pickling the instance in your main process, sending the bytes to the child process via IPC, unpickling it in the child, and then pickling it again so you can write it to disk. Given that, my guess is the pickling/unpickling overhead will be large enough that you're better off using a ThreadPoolExecutor, even though you're going to take a performance hit because of the GIL.

That said, it's very simple to test both ways and find out for sure, so you might as well do that.

Using threads in combination with asyncio

I would recommend creating a single event loop in a background thread and have it service all your async needs. It doesn't matter that your coroutines never end; asyncio is perfectly capable of executing multiple such functions in parallel.

For example:

def _start_async():
loop = asyncio.new_event_loop()
threading.Thread(target=loop.run_forever).start()
return loop

_loop = start_async()

# Submits awaitable to the event loop, but *doesn't* wait for it to
# complete. Returns a concurrent.futures.Future which *may* be used to
# wait for and retrieve the result (or exception, if one was raised)
def submit_async(awaitable):
return asyncio.run_coroutine_threadsafe(awaitable, _loop)

def stop_async():
_loop.call_soon_threadsafe(_loop.stop)

With these tools in place (and possibly in a separate module), you can do things like this:

class Scanner:
def __init__(self):
submit_async(self.connection())
# ...

# ...
  • What about the advice to use ProcessPoolExecutor?

Those apply to running CPU-bound code in parallel processes to avoid the GIL. If you are actually running async code, you shouldn't care about ProcessPoolExecutor.

  • What about the advice to use ThreadPoolExecutor?

A ThreadPoolExecutor is simply a thread pool useful for classic multi-threaded applications. In Python it is used primarily to make the program more responsive, not to make it faster. It allows you to run CPU-bound or blocking code in parallel with interactive code with neither getting starved. It won't make things faster due to the GIL.

Combining objects with asyncio and threads

import threading
import asyncio

class WatchdogManager (threading.Thread):
def __init__(self):
threading.Thread.__init__(self,target=self.loop_in_thread, args=(self.loop,))

async def watchdog_manager(self):
print("Watchdog manager initiated")
while True:
print("Watchdog manager running")
await asyncio.sleep(5)

def loop_in_thread(self,loop):
asyncio.set_event_loop(loop)
loop.run_until_complete(self.Watchdog_manager())

loop = asyncio.get_event_loop()

Run it:

watchdog_manager= WatchdogManager(app_log)
watchdog_manager.start()

Python multithreading with asyncio

Use asyncio's inbuilt executor and wrapper via loop.run_in_executor:

async def handle_evt(self, msg):
self.msg = msg
subject1 = msg['data']['subjetct']
subject2 = msg['data']['subjetct2']

loop = asyncio.get_running_loop()
future_buy = loop.run_in_executor(None, self.process, subject1)
future_sell = loop.run_in_executor(None, self.process, subject2)
# allow other coroutines to run while waiting
await future_buy
await future_sell

A concurrent.futures.Future is inherently synchronous – waiting for Future.result() blocks the current thread. Executing it inside an event loop thus blocks the event loop, including all coroutines.

The asyncio wrapper provides an asyncio.Future instead, which allows asynchronous waiting for results.

using asyncio and threads

Sure it may make sense.

Asynchronous code in principle runs a bunch of routines in the same thread.

This means that the moment one routine has to wait for input or output (I/O) it will halt that routine temporarily and simply starts processing another routine until it encounters a wait there, etc.

Multi-threaded (or "parallelized" code) runs in principle at the same time on different cores of your machine. (Note that in Python parallel processing is achieved by using multiple processes as pointed out by @Yassine Faris below).

It may make perfect sense to use both in the same program. Use asyncio in order to keep processing while waiting for I/O. Use multi-threading (multi processing in Python) to do, for example, heavy calculations in parallel in another part of your program.

How can I synchronize asyncio with other OS threads?

A simple way to synchronize an asyncio coroutine with an event coming from another thread is to await an asyncio.Event in taskB, and set it from taskA using loop.call_soon_threadsafe.

To be able to pass values and exceptions between the two, you can use futures; however then you are inventing much of run_in_executor. If the only job of taskA is to take tasks off a queue, you might as well make a single-worker "pool" and use it as your worker thread. Then you can use run_in_executor as intended:

worker = concurrent.futures.ThreadPoolExecutor(max_workers=1)

async def taskB(lst):
loop = asyncio.get_event_loop()
# or result = await ..., if taskA has a useful return value
# This will also propagate exceptions raised by taskA
await loop.run_in_executor(worker, taskA, lst)
print('Retrieved:', lst.pop())

The semantics are the same as in your version with an explicit queue - the queue is still there, it's just inside the ThreadPoolExecutor.

Multi-threaded asyncio in Python

Basically you want to schedule coroutine on loop of different thread. You could use run_coroutine_threadsafe:

future = asyncio.run_coroutine_threadsafe(protocol.stop, loop=loop)
future.result() # wait for results

Or the old style async like in https://stackoverflow.com/a/32084907/681044

import asyncio
from threading import Thread

loop = asyncio.new_event_loop()

def f(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

t = Thread(target=f, args=(loop,))
t.start()

@asyncio.coroutine
def g():
yield from asyncio.sleep(1)
print('Hello, world!')

loop.call_soon_threadsafe(asyncio.async, g())


Related Topics



Leave a reply



Submit