A Pattern for Self-Cancelling and Restarting Task

A pattern for self-cancelling and restarting task

I think the general concept is pretty good, though I recommend you not use ContinueWith.

I'd just write it using regular await, and a lot of the "am I already running" logic is not necessary:

Task pendingTask = null; // pending session
CancellationTokenSource cts = null; // CTS for pending session

// SpellcheckAsync is called by the client app on the UI thread
public async Task<bool> SpellcheckAsync(CancellationToken token)
{
// SpellcheckAsync can be re-entered
var previousCts = this.cts;
var newCts = CancellationTokenSource.CreateLinkedTokenSource(token);
this.cts = newCts;

if (previousCts != null)
{
// cancel the previous session and wait for its termination
previousCts.Cancel();
try { await this.pendingTask; } catch { }
}

newCts.Token.ThrowIfCancellationRequested();
this.pendingTask = SpellcheckAsyncHelper(newCts.Token);
return await this.pendingTask;
}

// the actual task logic
async Task<bool> SpellcheckAsyncHelper(CancellationToken token)
{
// do the work (pretty much IO-bound)
using (...)
{
bool doMore = true;
while (doMore)
{
token.ThrowIfCancellationRequested();
await Task.Delay(500); // placeholder to call the provider
}
return doMore;
}
}

How can I create a Task that can cancel itself and another Task if needed?

After a lot of searching, I came across "A pattern for self-cancelling and restarting task". This was exactly what I needed, and after some tweaks, I can safely say I got what I wanted. My implementation goes as follows:

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// The task that is currently pending.
/// </summary>
private Task _pendingTask = null;

/// <summary>
/// A linked token source to control Task execution.
/// </summary>
private CancellationTokenSource _tokenSource = null;

/// <summary>
/// Does some serious work.
/// </summary>
/// <exception cref="OperationCanceledException">Thrown when the
/// operation is cancelled.</exception>
public async Task SeriousWorkAsync(CancellationToken token)
{
await CompletePendingAsync(token);
this._pendingTask = SeriousImpl(this._tokenSource.Token);
await this._pendingTask;
}

/// <summary>
/// Does some fun work.
/// </summary>
/// <exception cref="OperationCanceledException">Thrown when the
/// operation is cancelled.</exception>
public async Task FunWorkAsync(CancellationToken token)
{
await CompletePendingAsync(token);
this._pendingTask = FunImpl(this._tokenSource.Token);
await this._pendingTask;
}

/// <summary>
/// Cancels the pending Task and waits for it to complete.
/// </summary>
/// <exception cref="OperationCanceledException">If the new token has
/// been canceled before the Task, an exception is thrown.</exception>
private async Task CompletePendingAsync(CancellationToken token)
{
// Generate a new linked token
var previousCts = this._tokenSource;
var newCts = CancellationTokenSource.CreateLinkedTokenSource(token);
this._tokenSource = newCts;

if (previousCts != null)
{
// Cancel the previous session and wait for its termination
previousCts.Cancel();
try { await this._pendingTask; } catch { }
}

// We need to check if we've been canceled
newCts.Token.ThrowIfCancellationRequested();
}

Ideally, calling the methods would look like this:

try
{
await SeriousWorkAsync(new CancellationToken());
}
catch (OperationCanceledException) { }

If you prefer, you can wrap your methods inside a try catch and always generate a new token, so consumers wouldn't need to apply special handling for cancellation:

var token = new CancellationToken();
try
{
await CompletePendingAsync(token);
this._pendingTask = FunImpl(this._tokenSource.Token);
await this._pendingTask;
}
catch { }

Lastly, I tested using the following implementations for SeriousWorkAsync and FunWorkAsync:

private async Task SeriousImpl(CancellationToken token)
{
Debug.WriteLine("--- Doing serious stuff ---");
for (int i = 1000; i <= 4000; i += 1000)
{
token.ThrowIfCancellationRequested();
Debug.WriteLine("Sending mails for " + i + "ms...");
await Task.Delay(i);
}
Debug.WriteLine("--- Done! ---");
}

private async Task FunImpl(CancellationToken token)
{
Debug.WriteLine("--- Having fun! ---");
for (int i = 1000; i <= 4000; i += 1000)
{
token.ThrowIfCancellationRequested();
Debug.WriteLine("Laughing for " + i + "ms...");
await Task.Delay(i);
}
Debug.WriteLine("--- Done! ---");
}

Self continuing Task using ContinueWith

I still wonder if there are drawbacks for a task to ContinueWith
itself?

Frankly, i find your code less readable with the continuation attached (but that is only flavor based). The only drawback i see is the fact that you use a WaitHandle on the token which forces you now to dispose your CancellationToken object:

Accessing this property causes a WaitHandle to be instantiated. It is
preferable to only use this property when necessary, and to then
dispose the associated CancellationTokenSource instance at the
earliest opportunity (disposing the source will dispose of this
allocated handle). The handle should not be closed or disposed
directly.

Instead, I find the pattern with a Task.Delay more clean and readable:

public static async Task CheckTask(CancellationToken token)
{
do
{
// Do some processing
Console.WriteLine("Processing");

await Task.Delay(1500, token);
} while (!token.IsCancellationRequested);

Console.WriteLine("Bye bye");
}

And then when you want to stop your Task, cancel its via CancellationTokenSource.

Cancelling a pending task synchronously on the UI thread

So we don't want to be doing a synchronous wait as that would be blocking the UI thread, and also possibly deadlocking.

The problem with handling it asynchronously is simply that the form will be closed before you're "ready". That can be fixed; simply cancel the form closing if the asynchronous task isn't done yet, and then close it again "for real" when the task does finish.

The method can look something like this (error handling omitted):

void MainForm_FormClosing(object sender, FormClosingEventArgs e)
{
if (!_task.IsCompleted)
{
e.Cancel = true;
_cts.Cancel();
_task.ContinueWith(t => Close(),
TaskScheduler.FromCurrentSynchronizationContext());
}
}

Note that, to make the error handling easier, you could at this point make the method async as well, instead of using explicit continuations.

Is it possible to suspend and restart tasks in async Python?

What you're asking for is possible, but not trivial. First, note that you can never have suspends on every await, but only on those that result in suspension of the coroutine, such as asyncio.sleep(), or a stream.read() that doesn't have data ready to return. Awaiting a coroutine immediately starts executing it, and if the coroutine can return immediately, it does so without dropping to the event loop. await only suspends to the event loop if the awaitee (or its awaitee, etc.) requests it. More details in these questions: [1], [2], [3], [4].

With that in mind, you can use the technique from this answer to intercept each resumption of the coroutine with additional code that checks whether the task is paused and, if so, waits for the resume event before proceeding.

import asyncio

class Suspendable:
def __init__(self, target):
self._target = target
self._can_run = asyncio.Event()
self._can_run.set()
self._task = asyncio.ensure_future(self)

def __await__(self):
target_iter = self._target.__await__()
iter_send, iter_throw = target_iter.send, target_iter.throw
send, message = iter_send, None
# This "while" emulates yield from.
while True:
# wait for can_run before resuming execution of self._target
try:
while not self._can_run.is_set():
yield from self._can_run.wait().__await__()
except BaseException as err:
send, message = iter_throw, err

# continue with our regular program
try:
signal = send(message)
except StopIteration as err:
return err.value
else:
send = iter_send
try:
message = yield signal
except BaseException as err:
send, message = iter_throw, err

def suspend(self):
self._can_run.clear()

def is_suspended(self):
return not self._can_run.is_set()

def resume(self):
self._can_run.set()

def get_task(self):
return self._task

Test:

import time

async def heartbeat():
while True:
print(time.time())
await asyncio.sleep(.2)

async def main():
task = Suspendable(heartbeat())
for i in range(5):
print('suspending')
task.suspend()
await asyncio.sleep(1)
print('resuming')
task.resume()
await asyncio.sleep(1)

asyncio.run(main())

Correctly cancel async operation and fire it again

I'd like to take a chance to refine some related code. In your case, it can be used like below.

Note, if the previous instance of the pending operation has failed (thrown anything other than OperationCanceledException), you'll still see an error message for it. This behavior can be easily changed.

It only hides the progress UI if by the end of the operation if it's still the most recent instance of the task: if (thisTask == _draw.PendingTask) _progressWindow.Hide();

This code is not thread-safe as is (_draw.RunAsync can't be called concurrently), and is designed to be called from a UI thread.

Window _progressWindow = new Window();

AsyncOp _draw = new AsyncOp();

async void Button_Click(object s, EventArgs args)
{
try
{
Task thisTask = null;
thisTask = _draw.RunAsync(async (token) =>
{
var progress = new Progress<int>(
(i) => { /* update the progress inside progressWindow */ });

// show and reset the progress
_progressWindow.Show();
try
{
// do the long-running task
await this.DrawContent(this.TimePeriod, progress, token);
}
finally
{
// if we're still the current task,
// hide the progress
if (thisTask == _draw.PendingTask)
_progressWindow.Hide();
}
}, CancellationToken.None);
await thisTask;
}
catch (Exception ex)
{
while (ex is AggregateException)
ex = ex.InnerException;
if (!(ex is OperationCanceledException))
MessageBox.Show(ex.Message);
}
}

class AsyncOp
{
Task _pendingTask = null;
CancellationTokenSource _pendingCts = null;

public Task PendingTask { get { return _pendingTask; } }

public void Cancel()
{
if (_pendingTask != null && !_pendingTask.IsCompleted)
_pendingCts.Cancel();
}

public Task RunAsync(Func<CancellationToken, Task> routine, CancellationToken token)
{
var oldTask = _pendingTask;
var oldCts = _pendingCts;

var thisCts = CancellationTokenSource.CreateLinkedTokenSource(token);

Func<Task> startAsync = async () =>
{
// await the old task
if (oldTask != null && !oldTask.IsCompleted)
{
oldCts.Cancel();
try
{
await oldTask;
}
catch (Exception ex)
{
while (ex is AggregateException)
ex = ex.InnerException;
if (!(ex is OperationCanceledException))
throw;
}
}
// run and await this task
await routine(thisCts.Token);
};

_pendingCts = thisCts;

_pendingTask = Task.Factory.StartNew(
startAsync,
_pendingCts.Token,
TaskCreationOptions.None,
TaskScheduler.FromCurrentSynchronizationContext()).Unwrap();

return _pendingTask;
}
}

Cancel execution and reexecute on method re-entry

Is there any way to ensure that if a Task is cancelled via a
cancellation token request and a new Task is started the cancellation
happens before the new task is started/returns execution without
blocking the UI thread?

Any reading to expand my understanding on this would be most
appreciated.

First, some related reading and questions, as requested:

  • "Async re-entrancy, and the patterns to deal with it" by Lucian Wischik
  • How to avoid reentrancy with async void event handlers?
  • Task sequencing and re-entracy
  • Correctly cancel async operation and fire it again
  • Cancelling a pending task synchronously on the UI thread
  • How to cancel async work gracefully?

If I understood your question correctly, your major concern is that the previous instance of the same task may update the UI (or ViewModel) with obsolete results, once it has completed.

To make sure this does not happen, use ThrowIfCancellationRequested with the corresponding token before your're going to update the UI/model, everywhere you do that. Then it would not matter if the most recent instance of the task completes before the previous older one. The older task will reach the ThrowIfCancellationRequested point before it might have a chance to do anything harmful, so you don't have to await the older task:

public async void Execute()
{
if (_cancellationTokenSource != null)
{
_cancellationTokenSource.Cancel();
}

_cancellationTokenSource = new CancellationTokenSource();
var token = _cancellationTokenSource.Token.

try
{
string dataItem = await _dataService.GetDataAsync(
_mainViewModel.Request,
token);

token.ThrowIfCancellationRequested();

_mainViewModel.Data.Add(dataItem);
}
catch (OperationCanceledException)
{
//Tidy up ** area of concern **
}
}

A different concern is what to do in the situation when the previous task fails with anything else than TaskCanceledException. This can too happen after the newer task has completed. It is up to you to decide whether to ignore this exception, re-throw it, or do anything else:

try
{
string dataItem = await _dataService.GetDataAsync(
_mainViewModel.Request,
token);

token.ThrowIfCancellationRequested();

_mainViewModel.Data.Add(dataItem);
}
catch (Exception ex)
{
if (ex is OperationCanceledException)
return

if (!token.IsCancellationRequested)
{
// thrown before the cancellation has been requested,
// report and re-throw
MessageBox.Show(ex.Message);
throw;
}

// otherwise, log and ignore
}


Related Topics



Leave a reply



Submit