Asyncio Cancellederror and Keyboardinterrupt

asyncio CancelledError and KeyboardInterrupt

task.cancel() itself doesn't finish the task: it just says to task that CancelledError should be raised inside it and returns immediately. You should call it and await while task would be actually cancelled (while it'll raise CancelledError).

You also shouldn't suppress CancelledError inside task.

Read this answer where I tried to show different ways of working with tasks. For example to cancel some task and await it cancelled you can do:

from contextlib import suppress

task = ... # remember, task doesn't suppress CancelledError itself

task.cancel() # returns immediately, we should await task raised CancelledError.

with suppress(asyncio.CancelledError):
await task # or loop.run_until_complete(task) if it happens after event loop stopped

# Now when we awaited for CancelledError and handled it,
# task is finally over and we can close event loop without warning.

How to shutdown gracefully on keyboard interrupt when an asyncio task is performing _blocking_ work?

Summary: You need to handle your exceptions, or asyncio will complain.

For background tasks (i.e. tasks that you don't explicitly wait for using gather())

You might think that trying to catch cancellation using except asyncio.CancelledError (and re-raising it) within your task would handle all types of cancellation. That's not the case. If your task is performing blocking work while being cancelled, you won't be able to catch the exception (e.g. KeyboardInterrupt) within the task itself. The safe bet here is to register a done callback using add_done_callback on your asyncio.Task. In this callback, check if there was an exception (see the updated example code in the question). If your task was stuck on blocking work while being cancelled, the done callback will tell you that the task was done (vs cancelled).

For a bunch of tasks that you await using gather()

If you use gather, you don't need to add done callbacks. Instead, ask it to return any exceptions and it will handle KeyboardInterrupt just fine. If you don't do this, the first exception being raised within any of its awaitables is immediately propagated to the task that awaits on gather(). In the case of a KeyboardInterrupt inside a task that's stuck doing blocking work, KeyboardInterrupt will be re-raised and you'll need to handle it. Alternatively, use try/except to handle any exceptions raised. Please try this yourself by setting the collect_exceptions_when_gathering variable in the example code.

Finally: the only thing I don't understand now is that I don't see any exception being raised if one calls gather() with a single task, not asking it to return exceptions. Try to modify the example code to have its range be range(1,2) and you won't get a messy stack trace on CTRL-C...?

What's the correct way to clean up after an interrupted event loop?

When you CTRL+C, the event loop gets stopped, so your calls to t.cancel() don't actually take effect. For the tasks to be cancelled, you need to start the loop back up again.

Here's how you can handle it:

import asyncio

@asyncio.coroutine
def shleepy_time(seconds):
print("Shleeping for {s} seconds...".format(s=seconds))
yield from asyncio.sleep(seconds)

if __name__ == '__main__':
loop = asyncio.get_event_loop()

# Side note: Apparently, async() will be deprecated in 3.4.4.
# See: https://docs.python.org/3.4/library/asyncio-task.html#asyncio.async
tasks = asyncio.gather(
asyncio.async(shleepy_time(seconds=5)),
asyncio.async(shleepy_time(seconds=10))
)

try:
loop.run_until_complete(tasks)
except KeyboardInterrupt as e:
print("Caught keyboard interrupt. Canceling tasks...")
tasks.cancel()
loop.run_forever()
tasks.exception()
finally:
loop.close()

Once we catch KeyboardInterrupt, we call tasks.cancel() and then start the loop up again. run_forever will actually exit as soon as tasks gets cancelled (note that cancelling the Future returned by asyncio.gather also cancels all the Futures inside of it), because the interrupted loop.run_until_complete call added a done_callback to tasks that stops the loop. So, when we cancel tasks, that callback fires, and the loop stops. At that point we call tasks.exception, just to avoid getting a warning about not fetching the exception from the _GatheringFuture.

Send Ctrl + C to asyncio Task

Use asyncio.run() instead. asyncio.run() cancels all tasks/futures when receiving exceptions.

asyncio.gather() would then cancel the tanks when itself is cancelled. You may capture asyncio.CancelledError around asyncio.sleep() to see. Discard it would keep the task running.

async def task_1():
try:
await asyncio.sleep(10)
except asyncio.CancelledError as ex:
print('task1', type(ex))
raise

async def task_2():
try:
await asyncio.sleep(10)
except asyncio.CancelledError as ex:
print('task2', type(ex))
raise

async def main():
await asyncio.gather(task_1(), task_2())

if __name__ == '__main__':
asyncio.run(main())

Otherwise, you need to keep references to tasks, cancel them, and re-run the event loop to propagate CancelledError, like what asynio.run() do under the hood.

Handling asyncio CancelledError simultaneous

The bug is of course in subtask, it should be cancelled when you call .cancel() on it.

Since you already know that, said it's incorrectly caught, and know that handling1 is the correct solution but does not accommodate for the bug, here is a fix to handling2:

task = asyncio.create_task(subtask())
await asyncio.sleep(1)
task.cancel()
try:
# Shield will make sure 2nd cancellation from wait_for will not block.
await asyncio.wait_for(asyncio.shield(task), 1)
except asyncio.TimeoutError:
if task.cancelled(): # Slight chance of happening
return # Task was cancelled correctly after exactly 1 second.
print("Task was not cancelled after 1 second! bug bug bug")
except asyncio.CancelledError:
if task.cancelled():
return # Task was cancelled correctly
print("handling 2 was cancelled!")
raise

How to prevent asyncio.Task from being cancelled

Note: asyncio.Task.all_tasks() is depricated, will refer it as asyncio.all_tasks() instead.



TL;DR Demo code

Different solutions per os type.

  • *nix: terminated by sending SIGINT
  • Windows: terminated by Ctrl+C

Task duration is set to 10 seconds, so terminate before task completes.

Pure asyncio (*nix only)

Complex, long, reinventing the wheels. Adds custom signal handler to prevent error propagation.

Demonstrating spawning 3 shielded & 3 unshielded tasks - former running until completion, latter getting canceled.

"""
Task shielding demonstration with pure asyncio, nix only
"""
import asyncio
import signal
import os

# Sets of tasks we shouldn't cancel
REQUIRE_SHIELDING = set()

async def work(n):
"""Some random io intensive work to test shielding"""
print(f"[{n}] Task start!")
try:
await asyncio.sleep(10)

except asyncio.CancelledError:
# we shouldn't see following output
print(f"[{n}] Canceled!")
return

print(f"[{n}] Task done!")

def install_handler():

def handler(sig_name):
print(f"Received {sig_name}")

# distinguish what to await and what to cancel. We'll have to await all,
# but we only have to manually cancel subset of it.
to_await = asyncio.all_tasks()
to_cancel = to_await - REQUIRE_SHIELDING

# cancel tasks that don't require shielding
for task in to_cancel:
task.cancel()

print(f"Cancelling {len(to_cancel)} out of {len(to_await)}")

loop = asyncio.get_running_loop()

# install for SIGINT and SIGTERM
for signal_name in ("SIGINT", "SIGTERM"):
loop.add_signal_handler(getattr(signal, signal_name), handler, signal_name)

async def main():
print(f"PID: {os.getpid()}")

# If main task is done - errored or not - all other tasks are canceled.
# So we need to shield main task.
REQUIRE_SHIELDING.add(asyncio.current_task())

# install handler
install_handler()

# spawn tasks that will be shielded
for n in range(3):
REQUIRE_SHIELDING.add(asyncio.create_task(work(n)))

# spawn tasks that won't be shielded, for comparison
for n in range(3, 6):
asyncio.create_task(work(n))

# we'll need to keep main task alive until all other task excluding self is done.
await asyncio.gather(*(REQUIRE_SHIELDING - {asyncio.current_task()}))

asyncio.run(main())
PID: 10778
[0] Task start!
[1] Task start!
[2] Task start!
[3] Task start!
[4] Task start!
[5] Task start!
Received SIGINT
Cancelling 3 out of 7
[3] Canceled!
[5] Canceled!
[4] Canceled!
[0] Task done!
[1] Task done!
[2] Task done!

asyncio + aiorun (All OS)

Demonstrating same thing as above.

"""
Task shielding demonstration with asyncio + aiorun, all OS
"""
import asyncio
import os

from aiorun import run, shutdown_waits_for

async def work(n):
"""Some random io intensive work to test shielding"""
print(f"[{n}] Task start!")
try:
await asyncio.sleep(10)

except asyncio.CancelledError:
print(f"[{n}] Canceled!")
return

print(f"[{n}] Task done!")

async def main():
print(f"PID: {os.getpid()}")
child_tasks = []

# spawn tasks that will be shielded
child_tasks.extend(
asyncio.create_task(shutdown_waits_for(work(n))) for n in range(3)
)

# spawn tasks without shielding for comparison
child_tasks.extend(asyncio.create_task(work(n)) for n in range(3))

# aiorun runs forever by default, even without any coroutines left to run.
# We'll have to manually stop the loop, but can't use asyncio.all_tasks()
# check as aiorun's internal tasks included in it run forever.
# instead, keep child task spawned by main task and await those.
await asyncio.gather(*child_tasks)
asyncio.get_running_loop().stop()

run(main())
PID: 26548
[0] Task start!
[1] Task start!
[2] Task start!
[3] Task start!
[4] Task start!
[5] Task start!
Stopping the loop
[4] Canceled!
[5] Canceled!
[3] Canceled!
[1] Task done!
[0] Task done!
[2] Task done!

Switching to trio (All OS)

Ground-up pure python asynchronous event loop without callback soup

"""
Task shielding demonstration with trio, all OS
"""
import os

import trio

async def work(n):
"""Some random io intensive work to test shielding"""
print(f"[{n}] Task start!")
try:
await trio.sleep(10)

except trio.Cancelled:
print(f"[{n}] Canceled!")
raise

print(f"[{n}] Task done!")

async def shielded():
# opening explicit concurrency context.
# Every concurrency in trio is explicit, via Nursery that takes care of tasks.
async with trio.open_nursery() as nursery:

# shield nursery from cancellation. Now all tasks in this scope is shielded.
nursery.cancel_scope.shield = True

# spawn tasks
for n in range(3):
nursery.start_soon(work, n)

async def main():
print(f"PID: {os.getpid()}")

try:
async with trio.open_nursery() as nursery:
nursery.start_soon(shielded)

for n in range(3, 6):
nursery.start_soon(work, n)

except (trio.Cancelled, KeyboardInterrupt):
# Nursery always make sure all child tasks are done - either canceled or not.
# This try-except is just here to suppress traceback. Not quite required.
print("Nursery Cancelled!")

trio.run(main)
PID: 23684
[3] Task start!
[4] Task start!
[5] Task start!
[0] Task start!
[1] Task start!
[2] Task start!
[3] Canceled!
[4] Canceled!
[5] Canceled!
[0] Task done!
[1] Task done!
[2] Task done!
Nursery Cancelled!

Below is a tiny bit in-depth ramble on asyncio's signal handler flow.



Pure asyncio's signal handling

Spent full day digging into this issue - tracing, searching, reading source codes, yet can't get a complete flow. Following flow is my guess.

Without custom signal handlers

  1. Receives SIGINT
  2. Somehow signal._signal.default_int_handler is called, raising KeyboardInterrupt
# signal/_signal.py - probably C code
def default_int_handler(*args, **kwargs): # real signature unknown
"""
The default handler for SIGINT installed by Python.

It raises KeyboardInterrupt.
"""

  1. Exception propagates, finally block runs in asyncio.run, calling asyncio.runners._cancel_all_tasks()
# asyncio.runners
def run(main, *, debug=None):
...
loop = events.new_event_loop()
try:
events.set_event_loop(loop)
if debug is not None:
loop.set_debug(debug)
return loop.run_until_complete(main)
finally:
try:
_cancel_all_tasks(loop) # <---- this is called
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
finally:
events.set_event_loop(None)
loop.close()

  1. asyncio.runners._cancel_all_tasks() cancel all tasks returned by asyncio.all_tasks
# asyncio/runners.py
def _cancel_all_tasks(loop):
to_cancel = tasks.all_tasks(loop) # <---- gets all running tasks
if not to_cancel: # internally list of weakref.WeakSet '_all_tasks'
return

for task in to_cancel: # <---- cancels all of it
task.cancel()

loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
...

At the end of execution, successful or not, any remaining tasks will receive cancellation in step 4 eventually.

Since that asyncio.shield also adds shielded tasks to _all_tasks it won't help either.

However, if we add custom handlers - things get a bit different.

With custom signal handlers


  1. We add out custom signal handler via asyncio.add_signal_handler
# asyncio/unix_events.py
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
...
def add_signal_handler(self, sig, callback, *args):
"""Add a handler for a signal. UNIX only.

Raise ValueError if the signal number is invalid or uncatchable.
Raise RuntimeError if there is a problem setting up the handler.
"""
...
handle = events.Handle(callback, args, self, None)
self._signal_handlers[sig] = handle # <---- added to sig handler dict
...
  1. Receives SIGINT
  2. Somehow our event loop's _handle_signal is called, gets matching signal handler from dictionary, and add it as a callback
# asyncio/unix_events.py
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
...
def _handle_signal(self, sig):
"""Internal helper that is the actual signal handler."""
handle = self._signal_handlers.get(sig) # <---- fetches added handler
if handle is None:
return # Assume it's some race condition.
if handle._cancelled:
self.remove_signal_handler(sig)
else:
self._add_callback_signalsafe(handle) # <---- adds as callback
...

  1. Our custom callback is called

Now default signal handler is not called, so KeyboardInterrupt haven't been raised, hence asyncio.run's try-finally block hasn't proceeded to finally yet. Therefore no asyncio.runners._cancel_all_tasks call.

All tasks finally survived! cancel non-essential tasks manually in handler and we're good to go.



Related Topics



Leave a reply



Submit