Asyncio: How to Cancel a Future Been Run by an Executor

asyncio: Is it possible to cancel a future been run by an Executor?

In this case, there is no way to cancel the Future once it has actually started running, because you're relying on the behavior of concurrent.futures.Future, and its docs state the following:

cancel()

Attempt to cancel the call. If the call is currently being executed
and cannot be cancelled then the method will return False
, otherwise
the call will be cancelled and the method will return True.

So, the only time the cancellation would be successful is if the task is still pending inside of the Executor. Now, you're actually using an asyncio.Future wrapped around a concurrent.futures.Future, and in practice the asyncio.Future returned by loop.run_in_executor() will raise a CancellationError if you try to yield from it after you call cancel(), even if the underlying task is actually already running. But, it won't actually cancel the execution of the task inside the Executor.

If you need to actually cancel the task, you'll need to use a more conventional method of interrupting the task running in the thread. The specifics of how you do that is use-case dependent. For the use-case you presented in the example, you could use a threading.Event:

def blocking_func(seconds_to_block, event):
for i in range(seconds_to_block):
if event.is_set():
return
print('blocking {}/{}'.format(i, seconds_to_block))
time.sleep(1)

print('done blocking {}'.format(seconds_to_block))

...
event = threading.Event()
blocking_future = loop.run_in_executor(None, blocking_func, 5, event)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)

blocking_future.cancel() # Mark Future as cancelled
event.set() # Actually interrupt blocking_func

Cancel run_in_executor coroutine from the main thread not working

The problem is related to the fact that your running method, which executes in the asyncio default thread pool, is calling back into the asyncio event loop thread and waiting for a result. In your example code, you're letting the run_them function exit immediately after you cancel the Task, which immediately shuts down your event loop. When the event loop shuts down, it means any outstanding coroutines do not complete.

This means your event loop shuts down before your running method receives the result from its future.result() call, which is waiting on an asyncio.sleep(5) call that will never complete. That means future.result() never returns, which leaves the running method hanging, which means the ThreadPoolExecutor it is running in can't shutdown. This is what prevents your application from exiting. Note how the stack trace you get when you Ctrl+C starts in the concurrent.futures library - that's where it waits for the ThreadPoolExecutor to shut down.

If you're using Python 3.9+, you should be able to fix this by adding a call to await loop.shutdown_default_executor() at the end of your run_them method. If you're using an earlier version, you have to basically implement that method yourself:

def shutdown(fut, loop):
try:
loop._default_executor.shutdown(wait=True)
finally:
loop.call_soon_threadsafe(fut.set_result, None)

async def run_them(steps, loop):
step = steps
event = threading.Event()
task = loop.create_task(running_bg(loop, event))
while steps:
await asyncio.sleep(2)
steps -= 1
event.set()
task.cancel()
try:
await task
except asyncio.CancelledError:
print("task cancelled")
# Wait for the default thread pool to shut down before exiting
fut = loop.create_future()
t = threading.Thread(target=shutdown, args=(fut, loop))
t.start()
await fut
t.join()

Alternatively, you could just not call task.cancel() and rely on the Event() to break out of the running method.

Cancelling asyncio task run in executor

You can do it the following way, actually from my point of view, if you do not have to use asyncio for the task, use only threads without any async loop, since it makes your code more complicated.

import asyncio
from random import randint
import time
from functools import partial

# imagine that this is links array
LINKS = list(range(1000))

# how many thread-worker you want to have simultaneously
WORKERS_NUM = 10

# stops the app
STOP_EVENT = asyncio.Event()
STOP_EVENT.clear()

def check_link(link: str) -> int:
"""checks link in another thread and returns result"""
time.sleep(3)
r = randint(1, 11)
print(f"{link}____{r}\n")
return r

async def check_link_wrapper(q: asyncio.Queue):
"""Async wrapper around sync function"""
loop = asyncio.get_event_loop()

while not STOP_EVENT.is_set():
link = await q.get()

if not link:
break

value = await loop.run_in_executor(None, func=partial(check_link, link))

if value == 10:
STOP_EVENT.set()
print("Hurray! We got TEN !")

async def feeder(q: asyncio.Queue):
"""Send tasks and "poison pill" to all workers"""
# send tasks to workers
for link in LINKS:
await q.put(link)

# ask workers to stop
for _ in range(WORKERS_NUM):
await q.put(None)

async def amain():
"""Main async function of the app"""
# maxsize is one since we want the app
# to stop as fast as possible if stop condition is met
q = asyncio.Queue(maxsize=1)
# we create separate task, since we do not want to await feeder
# we are interested only in workers
asyncio.create_task(feeder(q))
await asyncio.gather(
*[check_link_wrapper(q) for _ in range(WORKERS_NUM)],
)

if __name__ == '__main__':
asyncio.run(amain())

How to stop loop running in executor?

Is there an easy way to stop the thread or loop?

You cannot forcefully stop a thread. To implement the cancel functionality, your function will need to accept a should_stop argument, for example an instance of threading.Event, and occasionally check if it has been set.

If you really need a forceful stop, and if your function is runnable in a separate process through multiprocessing, you can run it in a separate process and kill the process when it is supposed to stop. See this answer for an elaboration of that approach in the context of asyncio.

Cancel long function using exceptions

A future object has been initialized here:

future = executor.submit(self.cancel_task)

Then, we run the long task:

result = some_very_long_task()

This call to some_very_long_task() will run to completion first before moving on to the next lines. Whether the future object raised an exception or not at this point, it wouldn't affect the current execution. Actually as documented, you have to explicitly call Future.result() to re-raise any exception that happened in the submitted process (here is self.cancel_task).

class concurrent.futures.Future

result(timeout=None)

If the call raised an exception, this method will raise the same exception.

So even if you call it:

future = executor.submit(self.cancel_task)
result = some_very_long_task()
future.result()

It will only run and re-raise any exception after the some_very_long_task() runs to completion, thus pointless as it didn't actually cancel/stop the execution of the long task.

Also a side note, future objects can't be cancelled once it already started, as documented:

cancel()

Attempt to cancel the call. If the call is currently being executed or finished running and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

Even submitting some_very_long_task to the executor and setting the timeout argument to result() wouldn't help as it would still wait for the task to finish, only that it would raise a TimeoutError if it exceeds the timeout once its done.

Alternative Solution

Perhaps you'll find a way, but seems like concurrent.futures isn't the tool for this job. You can consider using multiprocessing instead.

  1. Spawn a new process for some_very_long_task
  2. Run the process in the background
  3. While the process is running in the background, check if it must be cancelled already.
  4. If the process finished, then proceed as usual.
  5. But if the process isn't finished yet and we already received a signal to cancel it, terminate the process.
from datetime import datetime, timezone
import multiprocessing
import time

class OperationCancelledException(Exception):
pass

def some_very_long_task(response):
print("Start long task")
time.sleep(10) # Simulate the long task with a sleep
print("Finished long task")
response['response'] = "the response!"

def get_process_status():
# Let's say the signal to cancel an ongoing process comes from a file.
with open('status.txt') as status_file:
return status_file.read()

def execute():
response = multiprocessing.Manager().dict()

proc = multiprocessing.Process(target=some_very_long_task, args=(response,), kwargs={})
proc.start()

while proc.is_alive():
status = get_process_status()
if status == "CANCELLED":
proc.terminate()
raise OperationCancelledException()
time.sleep(1)

proc.join()
return response.get('response')

try:
print(datetime.now(timezone.utc), "Script started")
result = execute()
except OperationCancelledException:
print("Operation cancelled")
else:
print(result)
finally:
print(datetime.now(timezone.utc), "Script ended")

Output if status.txt contains "PENDING":

$ python3 script.py 
2021-09-08 13:17:32.234437+00:00 Script started
Start long task
Finished long task
the response!
2021-09-08 13:17:42.293814+00:00 Script ended
  • Task will run to completion (10 seconds sleep) if there is no signal to cancel it.

Output if status.txt contains "PENDING" and then changed to "CANCELLED" while script is running:

$ python3 script.py 
2021-09-08 13:19:13.370828+00:00 Script started
Start long task
Operation cancelled
2021-09-08 13:19:16.403367+00:00 Script ended
  • Task was halted just after 3 seconds (the time the file was updated) if there is a signal to cancel it.

Related references:

  • asyncio: Is it possible to cancel a future been run by an Executor?
  • How to use concurrent.futures with timeouts?

How to terminate long-running computation (CPU bound task) in Python using asyncio and concurrent.futures.ProcessPoolExecutor?

How do I terminate such long running CPU-bound computations within a method?

The approach you tried doesn't work because the futures returned by ProcessPoolExecutor are not cancellable. Although asyncio's run_in_executor tries to propagate the cancellation, it is simply ignored by Future.cancel once the task starts executing.

There is no fundamental reason for that. Unlike threads, processes can be safely terminated, so it would be perfectly possible for ProcessPoolExecutor.submit to return a future whose cancel terminated the corresponding process. Asyncio coroutines have well-defined cancellation semantics and could automatically make use of it. Unfortunately, ProcessPoolExecutor.submit returns a regular concurrent.futures.Future, which assumes the lowest common denominator of the underlying executors, and treats a running future as untouchable.

As a result, to cancel tasks executed in subprocesses, one must circumvent the ProcessPoolExecutor altogether and manage one's own processes. The challenge is how to do this without reimplementing half of multiprocessing. One option offered by the standard library is to (ab)use multiprocessing.Pool for this purpose, because it supports reliable shutdown of worker processes. A CancellablePool could work as follows:

  • Instead of spawning a fixed number of processes, spawn a fixed number of 1-worker pools.
  • Assign tasks to pools from an asyncio coroutine. If the coroutine is canceled while waiting for the task to finish in the other process, terminate the single-process pool and create a new one.
  • Since everything is coordinated from the single asyncio thread, don't worry about race conditions such as accidentally killing a process which has already started executing another task. (This would need to be prevented if one were to support cancellation in ProcessPoolExecutor.)

Here is a sample implementation of that idea:

import asyncio
import multiprocessing

class CancellablePool:
def __init__(self, max_workers=3):
self._free = {self._new_pool() for _ in range(max_workers)}
self._working = set()
self._change = asyncio.Event()

def _new_pool(self):
return multiprocessing.Pool(1)

async def apply(self, fn, *args):
"""
Like multiprocessing.Pool.apply_async, but:
* is an asyncio coroutine
* terminates the process if cancelled
"""
while not self._free:
await self._change.wait()
self._change.clear()
pool = usable_pool = self._free.pop()
self._working.add(pool)

loop = asyncio.get_event_loop()
fut = loop.create_future()
def _on_done(obj):
loop.call_soon_threadsafe(fut.set_result, obj)
def _on_err(err):
loop.call_soon_threadsafe(fut.set_exception, err)
pool.apply_async(fn, args, callback=_on_done, error_callback=_on_err)

try:
return await fut
except asyncio.CancelledError:
pool.terminate()
usable_pool = self._new_pool()
finally:
self._working.remove(pool)
self._free.add(usable_pool)
self._change.set()

def shutdown(self):
for p in self._working | self._free:
p.terminate()
self._free.clear()

A minimalistic test case showing cancellation:

def really_long_process():
print("I am a really long computation.....")
large_val = 9729379273492397293479237492734 ** 344323
print("I finally computed this large value: {}".format(large_val))

async def main():
loop = asyncio.get_event_loop()
pool = CancellablePool()

tasks = [loop.create_task(pool.apply(really_long_process))
for _ in range(5)]
for t in tasks:
try:
await asyncio.wait_for(t, 1)
except asyncio.TimeoutError:
print('task timed out and cancelled')
pool.shutdown()

asyncio.get_event_loop().run_until_complete(main())

Note how the CPU usage never exceeds 3 cores, and how it starts dropping near the end of the test, indicating that the processes are being terminated as expected.

To apply it to the code from the question, make self._lmz_executor an instance of CancellablePool and change self._loop.run_in_executor(...) to self._loop.create_task(self._lmz_executor.apply(...)).



Related Topics



Leave a reply



Submit