asyncio: Is it possible to cancel a future been run by an Executor?
In this case, there is no way to cancel the Future
once it has actually started running, because you're relying on the behavior of concurrent.futures.Future
, and its docs state the following:
So, the only time the cancellation would be successful is if the task is still pending inside of the
cancel()
Attempt to cancel the call. If the call is currently being executed
and cannot be cancelled then the method will returnFalse
, otherwise
the call will be cancelled and the method will returnTrue
.
Executor
. Now, you're actually using an asyncio.Future
wrapped around a concurrent.futures.Future
, and in practice the asyncio.Future
returned by loop.run_in_executor()
will raise a CancellationError
if you try to yield from
it after you call cancel()
, even if the underlying task is actually already running. But, it won't actually cancel the execution of the task inside the Executor
.If you need to actually cancel the task, you'll need to use a more conventional method of interrupting the task running in the thread. The specifics of how you do that is use-case dependent. For the use-case you presented in the example, you could use a threading.Event
:
def blocking_func(seconds_to_block, event):
for i in range(seconds_to_block):
if event.is_set():
return
print('blocking {}/{}'.format(i, seconds_to_block))
time.sleep(1)
print('done blocking {}'.format(seconds_to_block))
...
event = threading.Event()
blocking_future = loop.run_in_executor(None, blocking_func, 5, event)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)
blocking_future.cancel() # Mark Future as cancelled
event.set() # Actually interrupt blocking_func
Cancel run_in_executor coroutine from the main thread not working
The problem is related to the fact that your running
method, which executes in the asyncio
default thread pool, is calling back into the asyncio event loop thread and waiting for a result. In your example code, you're letting the run_them
function exit immediately after you cancel the Task, which immediately shuts down your event loop. When the event loop shuts down, it means any outstanding coroutines do not complete.
This means your event loop shuts down before your running
method receives the result from its future.result()
call, which is waiting on an asyncio.sleep(5)
call that will never complete. That means future.result()
never returns, which leaves the running
method hanging, which means the ThreadPoolExecutor
it is running in can't shutdown. This is what prevents your application from exiting. Note how the stack trace you get when you Ctrl+C starts in the concurrent.futures
library - that's where it waits for the ThreadPoolExecutor
to shut down.
If you're using Python 3.9+, you should be able to fix this by adding a call to await loop.shutdown_default_executor()
at the end of your run_them
method. If you're using an earlier version, you have to basically implement that method yourself:
def shutdown(fut, loop):
try:
loop._default_executor.shutdown(wait=True)
finally:
loop.call_soon_threadsafe(fut.set_result, None)
async def run_them(steps, loop):
step = steps
event = threading.Event()
task = loop.create_task(running_bg(loop, event))
while steps:
await asyncio.sleep(2)
steps -= 1
event.set()
task.cancel()
try:
await task
except asyncio.CancelledError:
print("task cancelled")
# Wait for the default thread pool to shut down before exiting
fut = loop.create_future()
t = threading.Thread(target=shutdown, args=(fut, loop))
t.start()
await fut
t.join()
Alternatively, you could just not call task.cancel() and rely on the Event()
to break out of the running
method. Cancelling asyncio task run in executor
You can do it the following way, actually from my point of view, if you do not have to use asyncio
for the task, use only threads without any async loop, since it makes your code more complicated.
import asyncio
from random import randint
import time
from functools import partial
# imagine that this is links array
LINKS = list(range(1000))
# how many thread-worker you want to have simultaneously
WORKERS_NUM = 10
# stops the app
STOP_EVENT = asyncio.Event()
STOP_EVENT.clear()
def check_link(link: str) -> int:
"""checks link in another thread and returns result"""
time.sleep(3)
r = randint(1, 11)
print(f"{link}____{r}\n")
return r
async def check_link_wrapper(q: asyncio.Queue):
"""Async wrapper around sync function"""
loop = asyncio.get_event_loop()
while not STOP_EVENT.is_set():
link = await q.get()
if not link:
break
value = await loop.run_in_executor(None, func=partial(check_link, link))
if value == 10:
STOP_EVENT.set()
print("Hurray! We got TEN !")
async def feeder(q: asyncio.Queue):
"""Send tasks and "poison pill" to all workers"""
# send tasks to workers
for link in LINKS:
await q.put(link)
# ask workers to stop
for _ in range(WORKERS_NUM):
await q.put(None)
async def amain():
"""Main async function of the app"""
# maxsize is one since we want the app
# to stop as fast as possible if stop condition is met
q = asyncio.Queue(maxsize=1)
# we create separate task, since we do not want to await feeder
# we are interested only in workers
asyncio.create_task(feeder(q))
await asyncio.gather(
*[check_link_wrapper(q) for _ in range(WORKERS_NUM)],
)
if __name__ == '__main__':
asyncio.run(amain())
How to stop loop running in executor?
You cannot forcefully stop a thread. To implement the cancel functionality, your function will need to accept aIs there an easy way to stop the thread or loop?
should_stop
argument, for example an instance of threading.Event
, and occasionally check if it has been set.If you really need a forceful stop, and if your function is runnable in a separate process through multiprocessing, you can run it in a separate process and kill the process when it is supposed to stop. See this answer for an elaboration of that approach in the context of asyncio.
Cancel long function using exceptions
A future object has been initialized here:
future = executor.submit(self.cancel_task)
Then, we run the long task:result = some_very_long_task()
This call to some_very_long_task()
will run to completion first before moving on to the next lines. Whether the future object raised an exception or not at this point, it wouldn't affect the current execution. Actually as documented, you have to explicitly call Future.result() to re-raise any exception that happened in the submitted process (here is self.cancel_task
).So even if you call it:
class concurrent.futures.Future
result(timeout=None)
If the call raised an exception, this method will raise the same exception.
future = executor.submit(self.cancel_task)
result = some_very_long_task()
future.result()
It will only run and re-raise any exception after the some_very_long_task()
runs to completion, thus pointless as it didn't actually cancel/stop the execution of the long task.Also a side note, future objects can't be cancelled once it already started, as documented:
Even submittingcancel()
Attempt to cancel the call. If the call is currently being executed or finished running and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.
some_very_long_task
to the executor and setting the timeout
argument to result()
wouldn't help as it would still wait for the task to finish, only that it would raise a TimeoutError
if it exceeds the timeout once its done.Alternative Solution
Perhaps you'll find a way, but seems like concurrent.futures
isn't the tool for this job. You can consider using multiprocessing instead.
- Spawn a new process for
some_very_long_task
- Run the process in the background
- While the process is running in the background, check if it must be cancelled already.
- If the process finished, then proceed as usual.
- But if the process isn't finished yet and we already received a signal to cancel it, terminate the process.
from datetime import datetime, timezone
import multiprocessing
import time
class OperationCancelledException(Exception):
pass
def some_very_long_task(response):
print("Start long task")
time.sleep(10) # Simulate the long task with a sleep
print("Finished long task")
response['response'] = "the response!"
def get_process_status():
# Let's say the signal to cancel an ongoing process comes from a file.
with open('status.txt') as status_file:
return status_file.read()
def execute():
response = multiprocessing.Manager().dict()
proc = multiprocessing.Process(target=some_very_long_task, args=(response,), kwargs={})
proc.start()
while proc.is_alive():
status = get_process_status()
if status == "CANCELLED":
proc.terminate()
raise OperationCancelledException()
time.sleep(1)
proc.join()
return response.get('response')
try:
print(datetime.now(timezone.utc), "Script started")
result = execute()
except OperationCancelledException:
print("Operation cancelled")
else:
print(result)
finally:
print(datetime.now(timezone.utc), "Script ended")
Output if status.txt
contains "PENDING"
:$ python3 script.py
2021-09-08 13:17:32.234437+00:00 Script started
Start long task
Finished long task
the response!
2021-09-08 13:17:42.293814+00:00 Script ended
- Task will run to completion (10 seconds sleep) if there is no signal to cancel it.
status.txt
contains "PENDING"
and then changed to "CANCELLED"
while script is running:$ python3 script.py
2021-09-08 13:19:13.370828+00:00 Script started
Start long task
Operation cancelled
2021-09-08 13:19:16.403367+00:00 Script ended
- Task was halted just after 3 seconds (the time the file was updated) if there is a signal to cancel it.
- asyncio: Is it possible to cancel a future been run by an Executor?
- How to use concurrent.futures with timeouts?
How to terminate long-running computation (CPU bound task) in Python using asyncio and concurrent.futures.ProcessPoolExecutor?
How do I terminate such long running CPU-bound computations within a method?The approach you tried doesn't work because the futures returned by
ProcessPoolExecutor
are not cancellable. Although asyncio's run_in_executor
tries to propagate the cancellation, it is simply ignored by Future.cancel
once the task starts executing.There is no fundamental reason for that. Unlike threads, processes can be safely terminated, so it would be perfectly possible for ProcessPoolExecutor.submit
to return a future whose cancel
terminated the corresponding process. Asyncio coroutines have well-defined cancellation semantics and could automatically make use of it. Unfortunately, ProcessPoolExecutor.submit
returns a regular concurrent.futures.Future
, which assumes the lowest common denominator of the underlying executors, and treats a running future as untouchable.
As a result, to cancel tasks executed in subprocesses, one must circumvent the ProcessPoolExecutor
altogether and manage one's own processes. The challenge is how to do this without reimplementing half of multiprocessing
. One option offered by the standard library is to (ab)use multiprocessing.Pool
for this purpose, because it supports reliable shutdown of worker processes. A CancellablePool
could work as follows:
- Instead of spawning a fixed number of processes, spawn a fixed number of 1-worker pools.
- Assign tasks to pools from an asyncio coroutine. If the coroutine is canceled while waiting for the task to finish in the other process, terminate the single-process pool and create a new one.
- Since everything is coordinated from the single asyncio thread, don't worry about race conditions such as accidentally killing a process which has already started executing another task. (This would need to be prevented if one were to support cancellation in
ProcessPoolExecutor
.)
import asyncio
import multiprocessing
class CancellablePool:
def __init__(self, max_workers=3):
self._free = {self._new_pool() for _ in range(max_workers)}
self._working = set()
self._change = asyncio.Event()
def _new_pool(self):
return multiprocessing.Pool(1)
async def apply(self, fn, *args):
"""
Like multiprocessing.Pool.apply_async, but:
* is an asyncio coroutine
* terminates the process if cancelled
"""
while not self._free:
await self._change.wait()
self._change.clear()
pool = usable_pool = self._free.pop()
self._working.add(pool)
loop = asyncio.get_event_loop()
fut = loop.create_future()
def _on_done(obj):
loop.call_soon_threadsafe(fut.set_result, obj)
def _on_err(err):
loop.call_soon_threadsafe(fut.set_exception, err)
pool.apply_async(fn, args, callback=_on_done, error_callback=_on_err)
try:
return await fut
except asyncio.CancelledError:
pool.terminate()
usable_pool = self._new_pool()
finally:
self._working.remove(pool)
self._free.add(usable_pool)
self._change.set()
def shutdown(self):
for p in self._working | self._free:
p.terminate()
self._free.clear()
A minimalistic test case showing cancellation:def really_long_process():
print("I am a really long computation.....")
large_val = 9729379273492397293479237492734 ** 344323
print("I finally computed this large value: {}".format(large_val))
async def main():
loop = asyncio.get_event_loop()
pool = CancellablePool()
tasks = [loop.create_task(pool.apply(really_long_process))
for _ in range(5)]
for t in tasks:
try:
await asyncio.wait_for(t, 1)
except asyncio.TimeoutError:
print('task timed out and cancelled')
pool.shutdown()
asyncio.get_event_loop().run_until_complete(main())
Note how the CPU usage never exceeds 3 cores, and how it starts dropping near the end of the test, indicating that the processes are being terminated as expected.To apply it to the code from the question, make self._lmz_executor
an instance of CancellablePool
and change self._loop.run_in_executor(...)
to self._loop.create_task(self._lmz_executor.apply(...))
.
Related Topics
Does Conda Replace the Need for Virtualenv
Join Two Lists of Dictionaries on a Single Key
Writing List of Strings to Excel CSV File in Python
Python Ignore Certificate Validation Urllib2
Preventing Python Code from Importing Certain Modules
Runtimeerror: This Event Loop Is Already Running in Python
Does Python Do Slice-By-Reference on Strings
Counting Cars Opencv + Python Issue
Find All Upper, Lower and Mixed Case Combinations of a String
Python & Pandas: How to Query If a List-Type Column Contains Something
Syntax Error: Invalid Syntax' for No Apparent Reason
Parallelize Apply After Pandas Groupby
Hide Chromedriver Console in Python
How to Skip Iterations in a Loop
Appending to the Same List from Different Processes Using Multiprocessing