Throttling Asynchronous Tasks

Throttling asynchronous tasks

As suggested, use TPL Dataflow.

A TransformBlock<TInput, TOutput> may be what you're looking for.

You define a MaxDegreeOfParallelism to limit how many strings can be transformed (i.e., how many urls can be downloaded) in parallel. You then post urls to the block, and when you're done you tell the block you're done adding items and you fetch the responses.

var downloader = new TransformBlock<string, HttpResponse>(
url => Download(url),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
);

var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);

foreach(var url in urls)
downloader.Post(url);
//or await downloader.SendAsync(url);

downloader.Complete();
await downloader.Completion;

IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
//process responses
}

Note: The TransformBlock buffers both its input and output. Why, then, do we need to link it to a BufferBlock?

Because the TransformBlock won't complete until all items (HttpResponse) have been consumed, and await downloader.Completion would hang. Instead, we let the downloader forward all its output to a dedicated buffer block - then we wait for the downloader to complete, and inspect the buffer block.

How to throttle multiple asynchronous tasks?

First, abstract away from threads. Especially since your operation is asynchronous, you shouldn't be thinking about "threads" at all. In the asynchronous world, you have tasks, and you can have a huge number of tasks compared to threads.

Throttling asynchronous code can be done using SemaphoreSlim:

static async Task DoSomething(int n);

static void RunConcurrently(int total, int throttle)
{
var mutex = new SemaphoreSlim(throttle);
var tasks = Enumerable.Range(0, total).Select(async item =>
{
await mutex.WaitAsync();
try { await DoSomething(item); }
finally { mutex.Release(); }
});
Task.WhenAll(tasks).Wait();
}

Throttle async tasks?

You want to limit yourself to something in most cases. You always have some state kept somewhere when you have multiple operations running concurrently. If they are CPU bound then tasks are stored in the ThreadPool queue waiting for a thread and if it's async then you have the state machine sitting on the heap.

Even async operations usually use up some limited resource, be it bandwith, ports, remote DB server's CPU, etc.

You don't have to limit yourself to a single batch at a time though (as you need to wait for the last operation to complete instead of starting others). You can throttle using a SlimSemahpore or even better, a TPL Dataflow block:

var block = new ActionBlock<string>(
url => downloadAsync(url),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });

urlList.ForEach(url => block.Post(url));

block.Complete();
await block.Completion;

Queue of async tasks with throttling which supports muti-threading

So we'll start out with a solution to a simpler problem, that of creating a queue that process up to N tasks concurrently, rather than throttling to N tasks started per second, and build on that:

public class TaskQueue
{
private SemaphoreSlim semaphore;
public TaskQueue()
{
semaphore = new SemaphoreSlim(1);
}
public TaskQueue(int concurrentRequests)
{
semaphore = new SemaphoreSlim(concurrentRequests);
}

public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
{
await semaphore.WaitAsync();
try
{
return await taskGenerator();
}
finally
{
semaphore.Release();
}
}
public async Task Enqueue(Func<Task> taskGenerator)
{
await semaphore.WaitAsync();
try
{
await taskGenerator();
}
finally
{
semaphore.Release();
}
}
}

We'll also use the following helper methods to match the result of a TaskCompletionSource to a `Task:

public static void Match<T>(this TaskCompletionSource<T> tcs, Task<T> task)
{
task.ContinueWith(t =>
{
switch (t.Status)
{
case TaskStatus.Canceled:
tcs.SetCanceled();
break;
case TaskStatus.Faulted:
tcs.SetException(t.Exception.InnerExceptions);
break;
case TaskStatus.RanToCompletion:
tcs.SetResult(t.Result);
break;
}

});
}

public static void Match<T>(this TaskCompletionSource<T> tcs, Task task)
{
Match(tcs, task.ContinueWith(t => default(T)));
}

Now for our actual solution what we can do is each time we need to perform a throttled operation we create a TaskCompletionSource, and then go into our TaskQueue and add an item that starts the task, matches the TCS to its result, doesn't await it, and then delays the task queue for 1 second. The task queue will then not allow a task to start until there are no longer N tasks started in the past second, while the result of the operation itself is the same as the create Task:

public class Throttler
{
private TaskQueue queue;
public Throttler(int requestsPerSecond)
{
queue = new TaskQueue(requestsPerSecond);
}
public Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
{
TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
var unused = queue.Enqueue(() =>
{
tcs.Match(taskGenerator());
return Task.Delay(TimeSpan.FromSeconds(1));
});
return tcs.Task;
}
public Task Enqueue<T>(Func<Task> taskGenerator)
{
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
var unused = queue.Enqueue(() =>
{
tcs.Match(taskGenerator());
return Task.Delay(TimeSpan.FromSeconds(1));
});
return tcs.Task;
}
}

Throttling asynchronous tasks in asp .net, with a limit on N successful tasks

Use Semaphore slim.

var semaphore = new SemaphoreSlim(5);
var tasks = urlCollection.Select(async url =>
{
await semaphore.WaitAsync();
try
{
return await CheckUrlAsync(url);
}
finally
{
semaphore.Release();
}
};
while(tasks.Where(t => t.Completed).Count() < 100)
{
await.Task.WhenAny(tasks);
}

Although I would prefer to use Rx.Net to produce some better code.

using(var semaphore = new SemaphoreSlim(5))
{
var results = urlCollection.ToObservable()
.Select(async url =>
{
await semaphore.WaitAsync();
try
{
return await CheckUrlAsync(url);
}
finally
{
semaphore.Release();
}
}).Take(100).ToList();
}

Okay...this is going to be fun.

public static class SemaphoreHelper
{
public static Task<T> ContinueWith<T>(
this SemaphoreSlim semaphore,
Func<Task<T>> action)
var ret = semaphore.WaitAsync()
.ContinueWith(action);
ret.ContinueWith(_ => semaphore.Release(), TaskContinuationOptions.None);
return ret;
}
var semaphore = new SemaphoreSlim(5);
var results = urlCollection.Select(
url => semaphore.ContinueWith(() => CheckUrlAsync(url)).ToList();

I do need to add that the code as it stands will still run all 300 URLs, it just will return quicker...thats all. You would need to add the cancelation token to the semaphore.WaitAsync(token) to cancel the queued work. Again I suggest using Rx.Net for that. Its just easier to use Rx.Net to get the cancelation token to work with .Take(100).

System.Reactive Throttling an async method

There are two key ways of converting a Task to an Observable, with an important difference between them.

Observable.FromAsync(()=>GetProductAsync("test"));

and

GetProductAsync("test").ToObservable();

The first will not start the Task until you subscribe to it.
The second will create (and start) the task and the result will either immediately or sometime later appear in the observable, depending on how fast the Task is.

Looking at your question in general though, it seems that you want to stop the flow of calls. You do not want to throttle the flow of results, which would result in unnecessary computation and loss.

If this is your aim, your GetProductAsync could be seen as an observer of call events, and the GetProductAsync should throttle those calls. One way of achieving that would be to declare a

public event Action<string> GetProduct;

and use

  var callStream= Observable.FromEvent<string>( 
handler => GetProduct+= handler ,
handler => GetProduct-= handler);

The problem then becomes how to return the result and what should happen when your 'caller's' call is throttled out and discarded.

One approach there could be to declare a type "GetProductCall" which would have the input string and output result as properties.

You could then have a setup like:

var callStream= Observable.FromEvent<GetProductCall>( 
handler => GetProduct+= handler ,
handler => GetProduct-= handler)
.Throttle(...)
.Select(r=>async r.Result= await GetProductCall(r.Input).ToObservable().FirstAsync());

(code not tested, just illustrative)

Another approach might include the Merge(N) overload that limits the max number of concurrent observables.

How to limit number of async IO tasks to database?


Task is an abstraction on threads, and doesn’t necessarily create a new thread. Semaphore limits the number of threads that can access that for loop. Execute returns a Task which aren’t threads. If there’s only 1 request, there will be only 1 thread inside that for loop, even if it is asking for 500 ids. The 1 thread sends off all the async IO tasks itself.

Sort of. I would not say that tasks are related to threads at all. There are actually two kinds of tasks: a delegate task (which is kind of an abstraction of a thread), and a promise task (which has nothing to do with threads).

Regarding the SemaphoreSlim, it does limit the concurrency of a block of code (not threads).

I recently started playing with C# so my understanding is not right looks like w.r.t Threads and Tasks.

I recommend reading my async intro and best practices. Follow up with There Is No Thread if you're interested more about how threads aren't really involved.

I modified ExecuteAsync to use Semaphore as shown below but it doesn't look like it does what I want it to do

The current code is only throttling the adding of the tasks to the list, which is only done one at a time anyway. What you want to do is throttle the execution itself:

private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy, Func<CancellationToken, int, Task<T>> mapper) where T : class
{
var throttler = new SemaphoreSlim(250);
var tasks = new List<Task<T>>(ids.Count);

// invoking multiple id in parallel to get data for each id from database
for (int i = 0; i < ids.Count; i++)
tasks.Add(ThrottledExecute(ids[i]));

// wait for all id response to come back
var responses = await Task.WhenAll(tasks);

// same excludeNull code check here
return excludeNull;

async Task<T> ThrottledExecute(int id)
{
await throttler.WaitAsync().ConfigureAwait(false);
try {
return await Execute(policy, ct => mapper(ct, id)).ConfigureAwait(false);
} finally {
throttler.Release();
}
}
}


Related Topics



Leave a reply



Submit