How to Run a Task on a Custom Taskscheduler Using Await

Problems scheduling/starting task on a custom TaskScheduler, which will complete execution on said Scheduler

First, be aware that the TaskScheduler only applies to executing tasks. So when using an async delegate, the TaskScheduler will only be used in-between await points; when the async code is (asynchronously) waiting in an await, then there is no code "in" that scheduler. For some schedulers this isn't a problem, but it is a problem with schedulers like ConcurrentExclusiveSchedulerPair because the code is not "in" the scheduler during an await.

If your TaskScheduler will work as expected with async code, then you can create a task running on it by calling Task.Factory.StartNew and passing your TaskScheduler. This will return a Task<Task> or Task<Task<T>> since StartNew doesn't understand async code; you can call Unwrap() on that value to get a "normal" asynchronous Task/Task<T>.

Personally, I prefer creating your own TaskFactory instance with your own TaskScheduler (and other options), and then call that StartNew with Unwrap. E.g:

var factory = new TaskFactory(CancellationToken.None, TaskCreationOptions.DenyChildAttach,
TaskContinuationOptions.DenyChildAttach, myTaskScheduler);
var task = factory.StartNew(() => MyCodeAsync()).Unwrap();

If desired, I have written some Run overloads for TaskFactory that make the TaskFactory usage look more like Task.Run:

// Equivalent to the above code
var factory = new TaskFactory(CancellationToken.None, TaskCreationOptions.DenyChildAttach,
TaskContinuationOptions.DenyChildAttach, myTaskScheduler);
var task = factory.Run(() => MyCodeAsync());

Running TaskT on a custom scheduler

Stephen Cleary's answer explains well why you can't use TaskScheduler for this purpose and how you can use ActionBlock to limit the degree of parallelism. But if you want to add priorities to that, I think you'll have to do that manually. Your approach of using a Dictionary of queues is reasonable, a simple implementation (with no support for cancellation or completion) of that could look something like this:

class Scheduler
{
private static readonly Priority[] Priorities =
(Priority[])Enum.GetValues(typeof(Priority));

private readonly IReadOnlyDictionary<Priority, ConcurrentQueue<Func<Task>>> queues;
private readonly ActionBlock<Func<Task>> executor;
private readonly SemaphoreSlim semaphore;

public Scheduler(int degreeOfParallelism)
{
queues = Priorities.ToDictionary(
priority => priority, _ => new ConcurrentQueue<Func<Task>>());

executor = new ActionBlock<Func<Task>>(
invocation => invocation(),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = degreeOfParallelism,
BoundedCapacity = degreeOfParallelism
});

semaphore = new SemaphoreSlim(0);

Task.Run(Watch);
}

private async Task Watch()
{
while (true)
{
await semaphore.WaitAsync();

// find item with highest priority and send it for execution
foreach (var priority in Priorities.Reverse())
{
Func<Task> invocation;
if (queues[priority].TryDequeue(out invocation))
{
await executor.SendAsync(invocation);
}
}
}
}

public void Invoke(Func<Task> invocation, Priority priority)
{
queues[priority].Enqueue(invocation);
semaphore.Release(1);
}
}

How to make ConcurrentExclusiveSchedulerPair work with async and await

The concept of the TaskScheduler was devised before the advent of the async/await, and it ended up not being compatible with it. You can see an experiment that demonstrates this incompatibility here: How to run a Task on a custom TaskScheduler using await?

The abstraction that is available for controlling the behavior of async/await is the SynchronizationContext. It is quite similar to a TaskScheduler. So much actually that some people have been wondering why we need both: What is the conceptual difference between SynchronizationContext and TaskScheduler.

If you are interested for something like a SingleThreadSynchronizationContext, you can find an implementation here: Await, SynchronizationContext, and Console Apps

Create multiple tasks, don't await them, but have them run sequentially

You'll hear frequently that using the Task constructor is not recommended, and it's a wise advice. It's easier to create a Func<Task>, and invoke it when you want to start the task, and it's also safer. The Task constructor has the same hidden gotchas with the ContinueWith method: it doesn't understand async delegates, and it requires to specify explicitly the scheduler when you start it. But if you know positively that the Task constructor is the best tool for your problem, here is how you can use it:

Task<Task> taskTask1 = new Task<Task>(async () =>
{
Console.WriteLine("Task1 Start");
await Task.Delay(5000);
Console.WriteLine("Task1 STOP");
});

Task<Task> taskTask2 = new Task<Task>(async () =>
{
Console.WriteLine("Task2 Start");
await Task.Delay(5000);
Console.WriteLine("Task2 STOP");
});

Task taskParent = Task.Run(async () =>
{
Console.WriteLine("starting 1");
taskTask1.Start(TaskScheduler.Default);
await taskTask1.Unwrap();
Console.WriteLine("starting 2");
taskTask2.Start(TaskScheduler.Default);
await taskTask2.Unwrap();
});

Console.WriteLine("BEGIN await parent");
await taskParent;
Console.WriteLine("END await parent");

Output:

BEGIN await parent
starting 1
Task1 Start
Task1 STOP
starting 2
Task2 Start
Task2 STOP
END await parent

Notice that you don't need to Unwrap the Task.Run, because this method understands async delegates, and does the unwrapping automatically for you.

Notice also the TaskScheduler.Default passed as argument to the Start method. Not specifying the scheduler makes your code depended on the ambient TaskScheduler.Current, and might generate warnings in the presence of the CA2008 analyzer.

Calling async method and TaskScheduler in async/await

In general, though, the "option 1" will create a new Task that wraps the call to foo(), effectively making a Task<Task<int>>. When you call .Wait() on it, it will not wait for the inner task to complete, since the inner task will return nearly immediately (as soon as the Task.Delay) is hit.

As to your question about using a non-default TaskScheduler, in general, it won't change the behavior, except for the fact that it may block until the custom scheduler starts the task. Without more information about the scheduler in question, it's impossible to know exactly what would happen.

The second option, however, will block until the delay is completed, as it will start the task, and block until after the delay is completed.

Scheduling Task vs. Task Creation

You can specify the TaskScheduler to use in certain overloads of ContinueWith. You decide where to run that code. It is not true that the scheduler cannot be specified here.

An async method runs on the captured SynchronizationContext or on the current TaskScheduler after the first await point. So indeed an async method does schedule continuations. (I'm leaving out the fact that you can have custom awaiters.)

Your async example synchronously runs on the main thread to completion.

Is it correct that ContinueWith not only creates the task but also schedules it.

Yes, on the scheduler you specify.

Is it correct that await not only creates the task for continuation as appears in the article but also schedule if possible (Call Post on SynchronizationContext or TaskScheduler).

Yes, after the first await (that does not immediately complete) a scheduling operation happens.

Why this design (scheduling and creation mixed) was adopted by async/await language designers?

The internals of async/await are quite involved. There is a lot of machinery and non-trivial behavior under the hood. It is especially surprising and inconsistent on what thread the code of an async method will actually run. The designers of this feature apparently had a hard time making this work out of the box in all important scenarios. This leads to numerous questions on Stack Overflow every day about edge cases and very surprising behavior.

You can untangle creation and scheduling with the rarely-used Task.Start (TaskScheduler) method.

For continuations this model doesn't work out. When the antecendent completes the continuation must be activated. Without a TaskScheduler to do that the continuation cannot be run. That's why the scheduler must be specified at the time the continuation is being set up.

For async methods you can untangle creation and scheduling as well by using a custom awaiter. Or by using simpler models such as await Task.Factory.StartNew(..., myScheduler).

bar will be executed without specifically scheduling the Task object created by foo.

This task is not a CPU-based task. It is never scheduled. This is a task backed by a TaskCompletionSource. Specifying a scheduler doesn't make sense here.

Why is my custom current scheduler replaced by the default one when I use ConfigureAwait(false)?

The parameter continueOnCapturedContext in ConfigureAwait(bool continueOnCapturedContext) has the following meaning: If true is specified, this means that the continuation should be marshaled back to the original context captured. If false is specified, the continuation may run on an arbitrary context.

The synchronization context is an abstraction for scheduling. A TaskScheduler is a concrete implementation. So by specifying ConfigureAwait(false), you state that any TaskScheduler can be used. If you want to use your special TaskScheduler, than use ConfigureAwait(true).

For more information on this topic, take a look at this post.

Keep running specific number of task async

It was brought to my attention that the struck-through solution will drop any exceptions that occur during execution. That's bad.

Here is a solution that will not drop exceptions:


Task.Run is a Factory Method for creating a Task. You can check yourself with the intellisense return value. You can assign the returned Task anywhere you like.

"await" is an operator that will wait until the task it operates on completes. You are able to use any Task with the await operator.

public static async Task RunTasksConcurrently()
{
IList<Task> tasks = new List<Task>();

for (int i = 1; i < 4; i++)
{
tasks.Add(RunNextTask());
}

foreach (var task in tasks) {
await task;
}
}

public static async Task RunNextTask()
{
while(true) {
await Task.Delay(500);
}
}

By adding the values of the Task we create to a list, we can await them later on in execution.


Previous Answer below

Edit: With the clarification I think I understand better.

Instead of running every task at once, you want to start 3 tasks, and as soon as a task is finished, run the next one.

I believe this can happen using the .ContinueWith(Action<Task>) method.

See if this gets closer to your intended solution.

    public void SpawnInitialTasks()
{
for (int i = 0; i < 3; i++)
{
RunNextTask();
}
}

public void RunNextTask()
{
Task.Run(async () => await Task.Delay(500))
.ContinueWith(t => RunNextTask());
// Recurse here to keep running tasks whenever we finish one.
}

The idea is that we spawn 3 tasks right away, then whenever one finishes we spawn the next. If you need to keep data flowing between the tasks, you can use parameters:

RunNextTask(DataObject object)



Related Topics



Leave a reply



Submit