How to combine python asyncio with threads?
It's pretty simple to delegate a method to a thread or sub-process using BaseEventLoop.run_in_executor
:
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
def cpu_bound_operation(x):
time.sleep(x) # This is some operation that is CPU-bound
@asyncio.coroutine
def main():
# Run cpu_bound_operation in the ProcessPoolExecutor
# This will make your coroutine block, but won't block
# the event loop; other coroutines can run in meantime.
yield from loop.run_in_executor(p, cpu_bound_operation, 5)
loop = asyncio.get_event_loop()
p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes
loop.run_until_complete(main())
As for whether to use a ProcessPoolExecutor
or ThreadPoolExecutor
, that's kind of hard to say; pickling a large object will definitely eat some CPU cycles, which initially would make you think ProcessPoolExecutor
is the way to go. However, passing your 100MB object to a Process
in the pool would require pickling the instance in your main process, sending the bytes to the child process via IPC, unpickling it in the child, and then pickling it again so you can write it to disk. Given that, my guess is the pickling/unpickling overhead will be large enough that you're better off using a ThreadPoolExecutor
, even though you're going to take a performance hit because of the GIL.
That said, it's very simple to test both ways and find out for sure, so you might as well do that.
Using threads in combination with asyncio
I would recommend creating a single event loop in a background thread and have it service all your async needs. It doesn't matter that your coroutines never end; asyncio is perfectly capable of executing multiple such functions in parallel.
For example:
def _start_async():
loop = asyncio.new_event_loop()
threading.Thread(target=loop.run_forever).start()
return loop
_loop = start_async()
# Submits awaitable to the event loop, but *doesn't* wait for it to
# complete. Returns a concurrent.futures.Future which *may* be used to
# wait for and retrieve the result (or exception, if one was raised)
def submit_async(awaitable):
return asyncio.run_coroutine_threadsafe(awaitable, _loop)
def stop_async():
_loop.call_soon_threadsafe(_loop.stop)
With these tools in place (and possibly in a separate module), you can do things like this:
class Scanner:
def __init__(self):
submit_async(self.connection())
# ...
# ...
- What about the advice to use
ProcessPoolExecutor
?
Those apply to running CPU-bound code in parallel processes to avoid the GIL. If you are actually running async code, you shouldn't care about ProcessPoolExecutor
.
- What about the advice to use
ThreadPoolExecutor
?
A ThreadPoolExecutor
is simply a thread pool useful for classic multi-threaded applications. In Python it is used primarily to make the program more responsive, not to make it faster. It allows you to run CPU-bound or blocking code in parallel with interactive code with neither getting starved. It won't make things faster due to the GIL.
Combining objects with asyncio and threads
import threading
import asyncio
class WatchdogManager (threading.Thread):
def __init__(self):
threading.Thread.__init__(self,target=self.loop_in_thread, args=(self.loop,))
async def watchdog_manager(self):
print("Watchdog manager initiated")
while True:
print("Watchdog manager running")
await asyncio.sleep(5)
def loop_in_thread(self,loop):
asyncio.set_event_loop(loop)
loop.run_until_complete(self.Watchdog_manager())
loop = asyncio.get_event_loop()
Run it:
watchdog_manager= WatchdogManager(app_log)
watchdog_manager.start()
Python multithreading with asyncio
Use asyncio
's inbuilt executor and wrapper via loop.run_in_executor
:
async def handle_evt(self, msg):
self.msg = msg
subject1 = msg['data']['subjetct']
subject2 = msg['data']['subjetct2']
loop = asyncio.get_running_loop()
future_buy = loop.run_in_executor(None, self.process, subject1)
future_sell = loop.run_in_executor(None, self.process, subject2)
# allow other coroutines to run while waiting
await future_buy
await future_sell
A concurrent.futures.Future
is inherently synchronous – waiting for Future.result()
blocks the current thread. Executing it inside an event loop thus blocks the event loop, including all coroutines.
The asyncio
wrapper provides an asyncio.Future
instead, which allows asynchronous waiting for results.
using asyncio and threads
Sure it may make sense.
Asynchronous code in principle runs a bunch of routines in the same thread.
This means that the moment one routine has to wait for input or output (I/O) it will halt that routine temporarily and simply starts processing another routine until it encounters a wait there, etc.
Multi-threaded (or "parallelized" code) runs in principle at the same time on different cores of your machine. (Note that in Python parallel processing is achieved by using multiple processes as pointed out by @Yassine Faris below).
It may make perfect sense to use both in the same program. Use asyncio in order to keep processing while waiting for I/O. Use multi-threading (multi processing in Python) to do, for example, heavy calculations in parallel in another part of your program.
How can I synchronize asyncio with other OS threads?
A simple way to synchronize an asyncio coroutine with an event coming from another thread is to await an asyncio.Event
in taskB, and set it from taskA using loop.call_soon_threadsafe
.
To be able to pass values and exceptions between the two, you can use futures; however then you are inventing much of run_in_executor
. If the only job of taskA is to take tasks off a queue, you might as well make a single-worker "pool" and use it as your worker thread. Then you can use run_in_executor
as intended:
worker = concurrent.futures.ThreadPoolExecutor(max_workers=1)
async def taskB(lst):
loop = asyncio.get_event_loop()
# or result = await ..., if taskA has a useful return value
# This will also propagate exceptions raised by taskA
await loop.run_in_executor(worker, taskA, lst)
print('Retrieved:', lst.pop())
The semantics are the same as in your version with an explicit queue - the queue is still there, it's just inside the ThreadPoolExecutor
.
Multi-threaded asyncio in Python
Basically you want to schedule coroutine on loop of different thread. You could use run_coroutine_threadsafe
:
future = asyncio.run_coroutine_threadsafe(protocol.stop, loop=loop)
future.result() # wait for results
Or the old style async
like in https://stackoverflow.com/a/32084907/681044
import asyncio
from threading import Thread
loop = asyncio.new_event_loop()
def f(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
t = Thread(target=f, args=(loop,))
t.start()
@asyncio.coroutine
def g():
yield from asyncio.sleep(1)
print('Hello, world!')
loop.call_soon_threadsafe(asyncio.async, g())
Related Topics
How to Access a Standard-Library Module in Python When There Is a Local Module with the Same Name
How to Use Valgrind with Python
Py_Initialize/Py_Finalize Not Working Twice with Numpy
What Is an 'Endpoint' in Flask
Determine Complete Django Url Configuration
Activate Python Virtualenv in Dockerfile
Python Pip Install Throws Typeerror: Unsupported Operand Type(S) for -=: 'Retry' and 'Int'
What's the Difference Between 'R+' and 'A+' When Open File in Python
How to Print a Dictionary's Key
Qwidget Does Not Draw Background Color
Find Substring in String But Only If Whole Words
Why Does the 'Is' Operator Behave Differently in a Script VS the Repl
Differencebetween a Pandas Series and a Single-Column Dataframe
Reading Two Text Files Line by Line Simultaneously
What Is Sys.Maxint in Python 3
How to Upload a File to Google Cloud Storage on Python 3
Combining Two Series into a Dataframe in Pandas
Django.Db.Utils.Operationalerror Could Not Connect to Server