Best Way in .Net to Manage Queue of Tasks on a Separate (Single) Thread

Best way in .NET to manage queue of tasks on a separate (single) thread

To create an asynchronous single degree of parallelism queue of work you can simply create a SemaphoreSlim, initialized to one, and then have the enqueing method await on the acquisition of that semaphore before starting the requested work.

public class TaskQueue
{
private SemaphoreSlim semaphore;
public TaskQueue()
{
semaphore = new SemaphoreSlim(1);
}

public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
{
await semaphore.WaitAsync();
try
{
return await taskGenerator();
}
finally
{
semaphore.Release();
}
}
public async Task Enqueue(Func<Task> taskGenerator)
{
await semaphore.WaitAsync();
try
{
await taskGenerator();
}
finally
{
semaphore.Release();
}
}
}

Of course, to have a fixed degree of parallelism other than one simply initialize the semaphore to some other number.

How to correctly queue up tasks to run in C#

As I always recommend.. what you need is TPL Dataflow (to install: Install-Package System.Threading.Tasks.Dataflow).

You create an ActionBlock with an action to perform on each item. Set MaxDegreeOfParallelism for throttling. Start posting into it and await its completion:

var block = new ActionBlock<QueryAvailabilityMultidayRequest>(async service => 
{
var availabilityResponse = await client.QueryAvailability(service);
// ...
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

foreach (var service in RunData.Demand)
{
block.Post(service);
}

block.Complete();
await block.Completion;

Process queue with multithreading or tasks

If you can use .Net 4.5, I'd suggest looking at Dataflow from the the Task Parallel Library (TPL).

That page leads to a lot of example walkthroughs such as How to: Implement a Producer-Consumer Dataflow Pattern and Walkthrough: Using Dataflow in a Windows Forms Application.

Have a look at that documentation to see if it would help you. It's quite a lot to take in, but I think it would probably be your best approach.

Alternatively, you could look into using a BlockingCollection along with its GetConsumingEnumerable() method to access items in the queue.

What you do is to split up the work into objects that you want to process in some way, and use a BlockingCollection to manage the queue.

Some sample code using ints rather than objects as the work items will help to demonstrate this:

When a worker thread has finished with it's current item, it will remove a new item from the work queue, process that item, then add it to the output queue.

A separate consumer thread removes completed items from the output queue and does something with them.

At the end we must wait for all the workers to finish (Task.WaitAll(workers)) before we can mark the output queue as completed (outputQueue.CompleteAdding()).

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
class Program
{
static void Main(string[] args)
{
new Program().run();
}

void run()
{
int threadCount = 4;
Task[] workers = new Task[threadCount];

Task.Factory.StartNew(consumer);

for (int i = 0; i < threadCount; ++i)
{
int workerId = i;
Task task = new Task(() => worker(workerId));
workers[i] = task;
task.Start();
}

for (int i = 0; i < 100; ++i)
{
Console.WriteLine("Queueing work item {0}", i);
inputQueue.Add(i);
Thread.Sleep(50);
}

Console.WriteLine("Stopping adding.");
inputQueue.CompleteAdding();
Task.WaitAll(workers);
outputQueue.CompleteAdding();
Console.WriteLine("Done.");

Console.ReadLine();
}

void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);

foreach (var workItem in inputQueue.GetConsumingEnumerable())
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem);
Thread.Sleep(100); // Simulate work.
outputQueue.Add(workItem); // Output completed item.
}

Console.WriteLine("Worker {0} is stopping.", workerId);
}

void consumer()
{
Console.WriteLine("Consumer is starting.");

foreach (var workItem in outputQueue.GetConsumingEnumerable())
{
Console.WriteLine("Consumer is using item {0}", workItem);
Thread.Sleep(25);
}

Console.WriteLine("Consumer is finished.");
}

BlockingCollection<int> inputQueue = new BlockingCollection<int>();
BlockingCollection<int> outputQueue = new BlockingCollection<int>();
}
}

Queuing asynchronous task in C#


Initially, i was using async await on each of these methods and each of the calls were executed asynchronously but we found out if they are out of sequence then there are room for errors.

So, i thought we should queue all these asynchronous tasks and send them in a separate thread but i want to know what options we have? I came across 'SemaphoreSlim' .

SemaphoreSlim does restrict asynchronous code to running one at a time, and is a valid form of mutual exclusion. However, since "out of sequence" calls can cause errors, then SemaphoreSlim is not an appropriate solution since it does not guarantee FIFO.

In a more general sense, no synchronization primitive guarantees FIFO because that can cause problems due to side effects like lock convoys. On the other hand, it is natural for data structures to be strictly FIFO.

So, you'll need to use your own FIFO queue, rather than having an implicit execution queue. Channels is a nice, performant, async-compatible queue, but since you're on an older version of C#/.NET, BlockingCollection<T> would work:

public sealed class ExecutionQueue
{
private readonly BlockingCollection<Func<Task>> _queue = new BlockingCollection<Func<Task>>();

public ExecutionQueue() => Completion = Task.Run(() => ProcessQueueAsync());

public Task Completion { get; }

public void Complete() => _queue.CompleteAdding();

private async Task ProcessQueueAsync()
{
foreach (var value in _queue.GetConsumingEnumerable())
await value();
}
}

The only tricky part with this setup is how to queue work. From the perspective of the code queueing the work, they want to know when the lambda is executed, not when the lambda is queued. From the perspective of the queue method (which I'm calling Run), the method needs to complete its returned task only after the lambda is executed. So, you can write the queue method something like this:

public Task Run(Func<Task> lambda)
{
var tcs = new TaskCompletionSource<object>();
_queue.Add(async () =>
{
// Execute the lambda and propagate the results to the Task returned from Run
try
{
await lambda();
tcs.TrySetResult(null);
}
catch (OperationCanceledException ex)
{
tcs.TrySetCanceled(ex.CancellationToken);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
});
return tcs.Task;
}

This queueing method isn't as perfect as it could be. If a task completes with more than one exception (this is normal for parallel code), only the first one is retained (this is normal for async code). There's also an edge case around OperationCanceledException handling. But this code is good enough for most cases.

Now you can use it like this:

public static ExecutionQueue _queue = new ExecutionQueue();

public async Task SendModuleDataToDSAsync(Module parameters)
{
var tasks1 = new List<Task>();
var tasks2 = new List<Task>();

foreach (var setting in Module.param)
{
Task job1 = _queue.Run(() => SaveModule(setting));
tasks1.Add(job1);
Task job2 = _queue.Run(() => SaveModule(GetAdvancedData(setting)));
tasks2.Add(job2);
}

await Task.WhenAll(tasks1);
await Task.WhenAll(tasks2);
}

Queue of async tasks with throttling which supports muti-threading

So we'll start out with a solution to a simpler problem, that of creating a queue that process up to N tasks concurrently, rather than throttling to N tasks started per second, and build on that:

public class TaskQueue
{
private SemaphoreSlim semaphore;
public TaskQueue()
{
semaphore = new SemaphoreSlim(1);
}
public TaskQueue(int concurrentRequests)
{
semaphore = new SemaphoreSlim(concurrentRequests);
}

public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
{
await semaphore.WaitAsync();
try
{
return await taskGenerator();
}
finally
{
semaphore.Release();
}
}
public async Task Enqueue(Func<Task> taskGenerator)
{
await semaphore.WaitAsync();
try
{
await taskGenerator();
}
finally
{
semaphore.Release();
}
}
}

We'll also use the following helper methods to match the result of a TaskCompletionSource to a `Task:

public static void Match<T>(this TaskCompletionSource<T> tcs, Task<T> task)
{
task.ContinueWith(t =>
{
switch (t.Status)
{
case TaskStatus.Canceled:
tcs.SetCanceled();
break;
case TaskStatus.Faulted:
tcs.SetException(t.Exception.InnerExceptions);
break;
case TaskStatus.RanToCompletion:
tcs.SetResult(t.Result);
break;
}

});
}

public static void Match<T>(this TaskCompletionSource<T> tcs, Task task)
{
Match(tcs, task.ContinueWith(t => default(T)));
}

Now for our actual solution what we can do is each time we need to perform a throttled operation we create a TaskCompletionSource, and then go into our TaskQueue and add an item that starts the task, matches the TCS to its result, doesn't await it, and then delays the task queue for 1 second. The task queue will then not allow a task to start until there are no longer N tasks started in the past second, while the result of the operation itself is the same as the create Task:

public class Throttler
{
private TaskQueue queue;
public Throttler(int requestsPerSecond)
{
queue = new TaskQueue(requestsPerSecond);
}
public Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
{
TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
var unused = queue.Enqueue(() =>
{
tcs.Match(taskGenerator());
return Task.Delay(TimeSpan.FromSeconds(1));
});
return tcs.Task;
}
public Task Enqueue<T>(Func<Task> taskGenerator)
{
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
var unused = queue.Enqueue(() =>
{
tcs.Match(taskGenerator());
return Task.Delay(TimeSpan.FromSeconds(1));
});
return tcs.Task;
}
}

How to fire but don't forget in a separate thread using .NET 4.5?

You could implement a queue to dispatch notifications. When you need to send the notification your API would simply add the notification to the queue and forget it.

Then there you can implement a background task to process the queue. You can configure this background task to use more than one threads if it needs to.

Is calling .ContinueWith on a Task indefinitely a valid way to emulate a queue and loop thread?

The fundamental approach is sound. You can abstract away the mechanism that you're using from the business logic of it by creating a TaskQueue class that manages the queue of items for you, allowing you to deal with that concept as an abstraction (you haven't shown your code, but you do need to be careful when appending these continuations to ensure that you don't have race conditions when two items are trying to append themselves at the same time). It's not particularly hard to do, but it's quite useful as it means you don't need to think about difficult low level threading issues in your business logic.

Here is an example of such a class that you can use for this.



Related Topics



Leave a reply



Submit