Awaitable Task Based Queue

awaitable Task based queue

I don't know of a lock-free solution, but you can take a look at the new Dataflow library, part of the Async CTP. A simple BufferBlock<T> should suffice, e.g.:

BufferBlock<int> buffer = new BufferBlock<int>();

Production and consumption are most easily done via extension methods on the dataflow block types.

Production is as simple as:

buffer.Post(13);

and consumption is async-ready:

int item = await buffer.ReceiveAsync();

I do recommend you use Dataflow if possible; making such a buffer both efficient and correct is more difficult than it first appears.

Queuing asynchronous task in C#

Initially, i was using async await on each of these methods and each of the calls were executed asynchronously but we found out if they are out of sequence then there are room for errors.

So, i thought we should queue all these asynchronous tasks and send them in a separate thread but i want to know what options we have? I came across 'SemaphoreSlim' .

SemaphoreSlim does restrict asynchronous code to running one at a time, and is a valid form of mutual exclusion. However, since "out of sequence" calls can cause errors, then SemaphoreSlim is not an appropriate solution since it does not guarantee FIFO.

In a more general sense, no synchronization primitive guarantees FIFO because that can cause problems due to side effects like lock convoys. On the other hand, it is natural for data structures to be strictly FIFO.

So, you'll need to use your own FIFO queue, rather than having an implicit execution queue. Channels is a nice, performant, async-compatible queue, but since you're on an older version of C#/.NET, BlockingCollection<T> would work:

public sealed class ExecutionQueue
{
private readonly BlockingCollection<Func<Task>> _queue = new BlockingCollection<Func<Task>>();

public ExecutionQueue() => Completion = Task.Run(() => ProcessQueueAsync());

public Task Completion { get; }

public void Complete() => _queue.CompleteAdding();

private async Task ProcessQueueAsync()
{
foreach (var value in _queue.GetConsumingEnumerable())
await value();
}
}

The only tricky part with this setup is how to queue work. From the perspective of the code queueing the work, they want to know when the lambda is executed, not when the lambda is queued. From the perspective of the queue method (which I'm calling Run), the method needs to complete its returned task only after the lambda is executed. So, you can write the queue method something like this:

public Task Run(Func<Task> lambda)
{
var tcs = new TaskCompletionSource<object>();
_queue.Add(async () =>
{
// Execute the lambda and propagate the results to the Task returned from Run
try
{
await lambda();
tcs.TrySetResult(null);
}
catch (OperationCanceledException ex)
{
tcs.TrySetCanceled(ex.CancellationToken);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
});
return tcs.Task;
}

This queueing method isn't as perfect as it could be. If a task completes with more than one exception (this is normal for parallel code), only the first one is retained (this is normal for async code). There's also an edge case around OperationCanceledException handling. But this code is good enough for most cases.

Now you can use it like this:

public static ExecutionQueue _queue = new ExecutionQueue();

public async Task SendModuleDataToDSAsync(Module parameters)
{
var tasks1 = new List<Task>();
var tasks2 = new List<Task>();

foreach (var setting in Module.param)
{
Task job1 = _queue.Run(() => SaveModule(setting));
tasks1.Add(job1);
Task job2 = _queue.Run(() => SaveModule(GetAdvancedData(setting)));
tasks2.Add(job2);
}

await Task.WhenAll(tasks1);
await Task.WhenAll(tasks2);
}

JS async / await tasks queue

You could save previous pending promise, await for it before calling next fetch.

// fake fetch for demo purposes onlyconst fetch = (url, options) => new Promise(resolve => setTimeout(resolve, 1000, {url, options}))
// task executorconst addTask = (() => { let pending = Promise.resolve(); const run = async (url, options) => { try { await pending; } finally { return fetch(url, options); } }
// update pending promise so that next task could await for it return (url, options) => (pending = run(url, options))})();
addTask('url1', {options: 1}).then(console.log)
addTask('url2', {options: 2}).then(console.log)
addTask('url3', {options: 3}).then(console.log)

How can I queue the Task result of an async method without running it?

How can I queue the Task result of an async method without running it?

Short answer: you can't. Calling an async method executes that method. It necessarily will start running. If you want to be able to defer the call, you need to wrap it in something that will do that.

One example might be Func<Task<T>>, except that the little tiny bit of code you deigned to share with us suggests you want to be able to return a promise (Task<T>) as well that represents this call you'll make in the future. You can wrap the whole thing in another task, like in your example code, but IMHO that's a pretty heavy-handed approach, since it ties up (and possibly creates new) a thread pool thread just for the purpose of calling the async method.

A better way (IMHO) to accomplish that is to use TaskCompletionSource<T>. You can store a Func<Task> in a queue, which uses the return value to set the TaskCompletionSource<T>, then when you decide you can start the task, invoke the Func<Task>.

Something like:

public Task<T> ExecuteAsync<T>(T transaction) {
if (mustDelay) {
TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();

enqueue(async () =>
{
tcs.SetValue(await executeAsync(transaction));
});
return tcs.Task;
}
return executeAsync(transaction);
}

Note that here, there's no need for ExecuteAsync<T>() to be async. You either return the TaskCompletionSource<T>'s task, or the task returned by the executeAsync<T>() method (by the way, having two methods with names that differ only in letter-casing is IMHO a horrible idea).

Note also that your queue will store Func<Task> objects, or maybe even Action objects (it's generally frowned upon async void methods such as the anonymous method above, but you didn't show any exception handling in the first place, so maybe you'll find it works fine in this case). When you dequeue an item, you'll invoke that delegate. Depending on your needs, this will either be "fire-and-forget" (if you store Action delegates) or the method that dequeues and invokes the delegate may await the return value of the delegate (if you are storing Func<Task> delegates).

Unfortunately, your question is fairly vague. So it's not possible to offer much more than this. If you need additional help, please improve the question so it includes a good Minimal, Complete, and Verifiable code example that shows clearly what you're trying to accomplish and what specifically you're having trouble figuring out, along with an appropriate explanation to describe that in detail.

Processing a queue quickly with async/await

Task.Delay is just pretty low resolution. As documentation states:

This method depends on the system clock. This means that the time
delay will approximately equal the resolution of the system clock if
the millisecondsDelay argument is less than the resolution of the
system clock, which is approximately 15 milliseconds on Windows
systems.

Your argument (1) is less than 15 milliseconds, so it gets adjusted to that. You can just do:

var watch = Stopwatch.StartNew();
for (int i = 0; i < 1000; i++) {
await Task.Delay(1);
}
watch.Stop();
Console.WriteLine($"Took {watch.ElapsedMilliseconds}ms");

to reproduce that. 1000 * 15ms = 15 seconds, which is about what you report.

Async Queue implementation .Wait() faster than await

Why is that?

There isn't enough information in the question to provide an answer to this.

As others have noted, there's a CPU-consuming spin issue with the loop as it currently is.

In the meantime, I can at least answer this part:

Is an async Queue that uses BlockingCollection a bad idea?

Yes.

What is a better alternative?

Use an async-compatible queue. E.g., Channels, or BufferBlock/ActionBlock from TPL Dataflow.

Example using Channels:

async Task Loop() {
await foreach (var func in channelReader.ReadAllAsync()) {
await func.Invoke();
}
}

or if you're not on .NET Core yet:

async Task Loop() {
while (await channelReader.WaitToReadAsync()) {
while (channelReader.TryRead(out var func)) {
await func.Invoke();
}
}
}

Object not awaitable when returning Taskobject instead of List

Get rid of the .Result call. It's essentially trying to make the asynchronous operation synchronous. (And then .First() is indeed not awaitable.)

The awaitable operation is _db.LoadData<Project, dynamic>(...), for example:

var data = await _db.LoadData<Project, dynamic>(
"dbo.Sp_Get_Project_Data_By_ProjectId",
new { projectId },
ConnectionStringName,
true
);
return data.First();

Or if you want to do it in one line, still await the call to LoadData and wrap that operation in parentheses:

return (await _db.LoadData<Project, dynamic>(
"dbo.Sp_Get_Project_Data_By_ProjectId",
new { projectId },
ConnectionStringName,
true
)).First();


Related Topics



Leave a reply



Submit