Task Sequencing and Re-Entracy

Task sequencing and re-entracy

I almost forgot it's possible to construct a Task manually, without starting or scheduling it. Then, "Task.Factory.StartNew" vs "new Task(...).Start" put me back on track. I think this is one of those few cases when the Task<TResult> constructor may actually be useful, along with nested tasks (Task<Task<T>>) and Task.Unwrap():

// AsyncOp
class AsyncOp<T>
{
Task<T> _pending = Task.FromResult(default(T));

public Task<T> CurrentTask { get { return _pending; } }

public Task<T> RunAsync(Func<Task<T>> handler, bool useSynchronizationContext = false)
{
var pending = _pending;
Func<Task<T>> wrapper = async () =>
{
// await the prev task
var prevResult = await pending;
Console.WriteLine("\nprev task result: " + prevResult);
// start and await the handler
return await handler();
};

var task = new Task<Task<T>>(wrapper);
var inner = task.Unwrap();
_pending = inner;

task.RunSynchronously(useSynchronizationContext ?
TaskScheduler.FromCurrentSynchronizationContext() :
TaskScheduler.Current);

return inner;
}
}

The output:


Test #1...

prev task result: 0
this task arg: 1000

prev task result: 1000
this task arg: 900

prev task result: 900
this task arg: 800

Press any key to continue to test #2...

prev task result: 800
this task arg: 100

prev task result: 100
this task arg: 200

It's now also very easy to make AsyncOp thread-safe by adding a lock to protect _pending, if needed.

Updated, this has been further improved with cancel/restart logic.

Execution of tasks launched asynchronously by button sequentially

Xerillio's proposed solution does work as long as you don't expect the button to be responsive after be pressed:

private async void button1_Click(object sender, EventArgs e)
{
button1.IsEnabled = false;
textBox1.Text = await Task.Run(() => MessageAsync());
button1.IsEnabled = true;
}

If you need to be able to use the button while your task is running, or in other words you expect several things to need access to the ListNumber resource you need to design a system for controlling access to that resource. Only allowing one producer to add values to the list at a time for instance would be a method but it all depends on what behavior you want to see.

Below is a working version which controls access to the LisNumber object using a semaphore.

    public MainWindow()
{
InitializeComponent();
ListNumber = new List<string>();
semaphore = new SemaphoreSlim(1, 1);
}
SemaphoreSlim semaphore;
List<string> ListNumber { get; set; }

private async void button1_Click(object sender, EventArgs e)
{
await NumberAsync();
textBox1.Text = await Message();
}

private async Task<string> Message()
{
await semaphore.WaitAsync();
var concat = "";
foreach (string number in ListNumber)
{
concat += number + ", ";
}
semaphore.Release();
return concat;
}

private async Task NumberAsync()
{
await semaphore.WaitAsync();
for (int i = 0; i < 30; i++)
{
ListNumber.Add(i.ToString());
await Task.Delay(300);
}
semaphore.Release();
}

You could also just wrap the button call in the semaphore if you wanted:

private async void button1_Click(object sender, EventArgs e)
{
await semaphore.WaitAsync();
await NumberAsync();
textBox1.Text = await Message();
semaphore.Release();
}

Running tasks sequentially: how to deal with exceptions?

The reason why your code doesn't work is because you're pretty much asking it to predict the future.

When a Task isn't completed yet, its Exception will always be null. This means that it's quite likely that your await previous will throw.

The simplest solution here is to use a general catch:

async Task<TResult> RunInternal(Func<TResult> func, Task<TResult> previous)
{
try
{
await previous;
}
catch {}

return await Task.Run(func);
}

If you want to hide that empty catch (because that's usually a bad practice), or if you do this often and want to avoid repetition, you can write a helper method for this:

public static async Task IgnoreException(this Task task)
{
try
{
await task;
}
catch {}
}

Usage:

async Task<TResult> RunInternal(Func<TResult> func, Task<TResult> previous)
{
await previous.IgnoreException();
return await Task.Run(func);
}

Cancel execution and reexecute on method re-entry

Is there any way to ensure that if a Task is cancelled via a
cancellation token request and a new Task is started the cancellation
happens before the new task is started/returns execution without
blocking the UI thread?

Any reading to expand my understanding on this would be most
appreciated.

First, some related reading and questions, as requested:

  • "Async re-entrancy, and the patterns to deal with it" by Lucian Wischik
  • How to avoid reentrancy with async void event handlers?
  • Task sequencing and re-entracy
  • Correctly cancel async operation and fire it again
  • Cancelling a pending task synchronously on the UI thread
  • How to cancel async work gracefully?

If I understood your question correctly, your major concern is that the previous instance of the same task may update the UI (or ViewModel) with obsolete results, once it has completed.

To make sure this does not happen, use ThrowIfCancellationRequested with the corresponding token before your're going to update the UI/model, everywhere you do that. Then it would not matter if the most recent instance of the task completes before the previous older one. The older task will reach the ThrowIfCancellationRequested point before it might have a chance to do anything harmful, so you don't have to await the older task:

public async void Execute()
{
if (_cancellationTokenSource != null)
{
_cancellationTokenSource.Cancel();
}

_cancellationTokenSource = new CancellationTokenSource();
var token = _cancellationTokenSource.Token.

try
{
string dataItem = await _dataService.GetDataAsync(
_mainViewModel.Request,
token);

token.ThrowIfCancellationRequested();

_mainViewModel.Data.Add(dataItem);
}
catch (OperationCanceledException)
{
//Tidy up ** area of concern **
}
}

A different concern is what to do in the situation when the previous task fails with anything else than TaskCanceledException. This can too happen after the newer task has completed. It is up to you to decide whether to ignore this exception, re-throw it, or do anything else:

try
{
string dataItem = await _dataService.GetDataAsync(
_mainViewModel.Request,
token);

token.ThrowIfCancellationRequested();

_mainViewModel.Data.Add(dataItem);
}
catch (Exception ex)
{
if (ex is OperationCanceledException)
return

if (!token.IsCancellationRequested)
{
// thrown before the cancellation has been requested,
// report and re-throw
MessageBox.Show(ex.Message);
throw;
}

// otherwise, log and ignore
}

A pattern for self-cancelling and restarting task

I think the general concept is pretty good, though I recommend you not use ContinueWith.

I'd just write it using regular await, and a lot of the "am I already running" logic is not necessary:

Task pendingTask = null; // pending session
CancellationTokenSource cts = null; // CTS for pending session

// SpellcheckAsync is called by the client app on the UI thread
public async Task<bool> SpellcheckAsync(CancellationToken token)
{
// SpellcheckAsync can be re-entered
var previousCts = this.cts;
var newCts = CancellationTokenSource.CreateLinkedTokenSource(token);
this.cts = newCts;

if (previousCts != null)
{
// cancel the previous session and wait for its termination
previousCts.Cancel();
try { await this.pendingTask; } catch { }
}

newCts.Token.ThrowIfCancellationRequested();
this.pendingTask = SpellcheckAsyncHelper(newCts.Token);
return await this.pendingTask;
}

// the actual task logic
async Task<bool> SpellcheckAsyncHelper(CancellationToken token)
{
// do the work (pretty much IO-bound)
using (...)
{
bool doMore = true;
while (doMore)
{
token.ThrowIfCancellationRequested();
await Task.Delay(500); // placeholder to call the provider
}
return doMore;
}
}

Regarding usage of Task.Start() , Task.Run() and Task.Factory.StartNew()

Task.Run is a shorthand for Task.Factory.StartNew with specific safe arguments:

Task.Factory.StartNew(
action,
CancellationToken.None,
TaskCreationOptions.DenyChildAttach,
TaskScheduler.Default);

It was added in .Net 4.5 to help with the increasingly frequent usage of async and offloading work to the ThreadPool.

Task.Factory.StartNew (added with TPL in .Net 4.0) is much more robust. You should only use it if Task.Run isn't enough, for example when you want to use TaskCreationOptions.LongRunning (though it's unnecessary when the delegate is async. More on that on my blog: LongRunning Is Useless For Task.Run With async-await). More on Task.Factory.StartNew in Task.Run vs Task.Factory.StartNew

Don't ever create a Task and call Start() unless you find an extremely good reason to do so. It should only be used if you have some part that needs to create tasks but not schedule them and another part that schedules without creating. That's almost never an appropriate solution and could be dangerous. More in "Task.Factory.StartNew" vs "new Task(...).Start"

In conclusion, mostly use Task.Run, use Task.Factory.StartNew if you must and never use Start.

Async equivalent for Task.RunSynchronously?

You can await the task with task.Start(); await task; but this will still not solve the problem because you'd await the outer task that wraps the actual job.

Once you change the collection to keep Task and pass Func<Task> the inner job cab be awaited:

private BlockingCollection<Task<Task>> _taskQ = new BlockingCollection<Task<Task>>();
public Task Enqueue(Func<Task> action, CancellationToken cancelToken = default(CancellationToken))
{
var task = new Task<Task>(action, cancelToken);
_taskQ.Add(task);
return task;
}(...)

task.Start();
var job = await task;
await job;

This would then for one cosumer execute sequentially.

Going Func<Task

We can simplify the code if we go with Func<Task>:

static void Main()
{
var pcQ = new PCQueue(1); // Maximum concurrency of 1

foreach (int item in Enumerable.Range(1, 5))
{
pcQ.Enqueue(async (CancellationToken token) =>
{
Console.WriteLine($"Starting {item}");
await Task.Delay(100, token); //<-------simulation of work
Console.WriteLine($"Ending {item}");
});
}
Console.ReadLine();

}

public class PCQueue
{
private BlockingCollection<Func<CancellationToken, Task>> _taskQ = new BlockingCollection<Func<CancellationToken, Task>>();
public void Enqueue(Func<CancellationToken, Task> action) => _taskQ.Add(action);

public PCQueue(int workerCount)
{
for (int i = 0; i < workerCount; i++)
Task.Run(Consume);
}

async Task Consume()
{
var cancellationToken = ...
foreach (var f in _taskQ.GetConsumingEnumerable())
await f(cancellationToken);
}

}
}
}

Task.Delay not delaying async method

Remove redundant Task wrapper - this is what causing the issues.

Store simulations as a function returning Task and start simulation explicitly by invoking it.

public class SimulationManager
{
public List<Func<Task>> Simulations = new List<Func<Task>>();

public void AddSimulation(SimulationParameters parameters)
{
Simulations.Add(() => new Simulation().Simulate());
}

public async Task StartSimulations()
{
var tasks = Simulations.Select(simulate => simulate()).ToArray();
await Task.WhenAll(tasks);

Console.WriteLine("All tasks finished");
}
}


Related Topics



Leave a reply



Submit