How to Limit the Amount of Concurrent Async I/O Operations

How to limit the amount of concurrent async I/O operations?

You can definitely do this in the latest versions of async for .NET, using .NET 4.5 Beta. The previous post from 'usr' points to a good article written by Stephen Toub, but the less announced news is that the async semaphore actually made it into the Beta release of .NET 4.5

If you look at our beloved SemaphoreSlim class (which you should be using since it's more performant than the original Semaphore), it now boasts the WaitAsync(...) series of overloads, with all of the expected arguments - timeout intervals, cancellation tokens, all of your usual scheduling friends :)

Stephen's also written a more recent blog post about the new .NET 4.5 goodies that came out with beta see What’s New for Parallelism in .NET 4.5 Beta.

Last, here's some sample code about how to use SemaphoreSlim for async method throttling:

public async Task MyOuterMethod()
{
// let's say there is a list of 1000+ URLs
var urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
var allTasks = new List<Task>();
var throttler = new SemaphoreSlim(initialCount: 20);
foreach (var url in urls)
{
// do an async wait until we can schedule again
await throttler.WaitAsync();

// using Task.Run(...) to run the lambda in its own parallel
// flow on the threadpool
allTasks.Add(
Task.Run(async () =>
{
try
{
var client = new HttpClient();
var html = await client.GetStringAsync(url);
}
finally
{
throttler.Release();
}
}));
}

// won't get here until all urls have been put into tasks
await Task.WhenAll(allTasks);

// won't get here until all tasks have completed in some way
// (either success or exception)
}

Last, but probably a worthy mention is a solution that uses TPL-based scheduling. You can create delegate-bound tasks on the TPL that have not yet been started, and allow for a custom task scheduler to limit the concurrency. In fact, there's an MSDN sample for it here:

See also TaskScheduler .

Limiting concurrent async tasks

from what I know, Task.Run is for CPU-bound work

Correct.

and async/await for I/O-bound work.

No. await is a tool for adding continuations to an asynchronous operation. It doesn't care about the nature of what that asynchronous operation is. It simply makes it easier to compose asynchronous operations of any kind together.

If you want to compose several asyncrhonous operations together you do that by making an async method, using the various asynchronous operations, awaiting them when you need their results (or for them to be completed) and then use the Task form that method as its own new asynchronous operation.

In your case your new asynchronous operation simply needs to be awaiting the semaphore, uploading your file, then releasing the semaphore.

async Task UploadFile()
{
await semaphore.WaitAsync();
try
{
await sftp.UploadAsync(
File.OpenRead(file),
Path.GetFileName(file),
true));
}
finally
{
semaphore.Release();
}
}

Now you can simply call that method for each file.

Additionally, because this is such a common operation to do, you may find it worth it to create a new class to handle this logic so that you can simply make a queue, and add items to the queue, and have it handle the throttling internally, rather than replicating that mechanic everywhere you use it.

How to limit the Maximum number of parallel tasks in c#

SemaphoreSlim is a very good solution in this case and I higly recommend OP to try this, but @Manoj's answer has flaw as mentioned in comments.semaphore should be waited before spawning the task like this.

Updated Answer: As @Vasyl pointed out Semaphore may be disposed before completion of tasks and will raise exception when Release() method is called so before exiting the using block must wait for the completion of all created Tasks.

int maxConcurrency=10;
var messages = new List<string>();
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
List<Task> tasks = new List<Task>();
foreach(var msg in messages)
{
concurrencySemaphore.Wait();

var t = Task.Factory.StartNew(() =>
{
try
{
Process(msg);
}
finally
{
concurrencySemaphore.Release();
}
});

tasks.Add(t);
}

Task.WaitAll(tasks.ToArray());
}

Answer to Comments
for those who want to see how semaphore can be disposed without Task.WaitAll
Run below code in console app and this exception will be raised.

System.ObjectDisposedException: 'The semaphore has been disposed.'

static void Main(string[] args)
{
int maxConcurrency = 5;
List<string> messages = Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();

using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
List<Task> tasks = new List<Task>();
foreach (var msg in messages)
{
concurrencySemaphore.Wait();

var t = Task.Factory.StartNew(() =>
{
try
{
Process(msg);
}
finally
{
concurrencySemaphore.Release();
}
});

tasks.Add(t);
}

// Task.WaitAll(tasks.ToArray());
}
Console.WriteLine("Exited using block");
Console.ReadKey();
}

private static void Process(string msg)
{
Thread.Sleep(2000);
Console.WriteLine(msg);
}

How to limit the amount of concurrent async I/O operations?

You can definitely do this in the latest versions of async for .NET, using .NET 4.5 Beta. The previous post from 'usr' points to a good article written by Stephen Toub, but the less announced news is that the async semaphore actually made it into the Beta release of .NET 4.5

If you look at our beloved SemaphoreSlim class (which you should be using since it's more performant than the original Semaphore), it now boasts the WaitAsync(...) series of overloads, with all of the expected arguments - timeout intervals, cancellation tokens, all of your usual scheduling friends :)

Stephen's also written a more recent blog post about the new .NET 4.5 goodies that came out with beta see What’s New for Parallelism in .NET 4.5 Beta.

Last, here's some sample code about how to use SemaphoreSlim for async method throttling:

public async Task MyOuterMethod()
{
// let's say there is a list of 1000+ URLs
var urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
var allTasks = new List<Task>();
var throttler = new SemaphoreSlim(initialCount: 20);
foreach (var url in urls)
{
// do an async wait until we can schedule again
await throttler.WaitAsync();

// using Task.Run(...) to run the lambda in its own parallel
// flow on the threadpool
allTasks.Add(
Task.Run(async () =>
{
try
{
var client = new HttpClient();
var html = await client.GetStringAsync(url);
}
finally
{
throttler.Release();
}
}));
}

// won't get here until all urls have been put into tasks
await Task.WhenAll(allTasks);

// won't get here until all tasks have completed in some way
// (either success or exception)
}

Last, but probably a worthy mention is a solution that uses TPL-based scheduling. You can create delegate-bound tasks on the TPL that have not yet been started, and allow for a custom task scheduler to limit the concurrency. In fact, there's an MSDN sample for it here:

See also TaskScheduler .

C#: limit maximum of concurrent operation with Parallel.ForEach and async Action

Update

As I just notice you mentioned in comment, the problem is caused by math calculation.

It will be better to separate the part of calculation and updating DB.

For the calculation part, use Parallel.ForEach() so as to optimize your work and you can control the thread number.

And only after all these tasks finished. Use async-await to update your data to DB without SemaphoreSlim I mentioned.

public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB

// run background task, don't wait when it finishes
Task.Run(async () => {

//Calculation Part
ConcurrentBag<int> data = new ConcurrentBag<int>();
Parallel.ForEach(
listOfData,
new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },
x => {ConcurrentBag.Add(calculationPart(x))});

//Update DB part
int[] data_arr = data.ToArray();
List<Task> worker = new List<Task>();
foreach (var i in data_arr)
{
worker.Add(DBPart(x));
}
await Task.WhenAll(worker);
});

// return created id immediately
return id;
}

For sure they all start together as you use async-await in Parallel.forEach.

First, read about this question for both 1st and 2nd answer. Combining these two are meaningless.

Actually async-await will maximize the usage of available thread, so simply use it.

public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB

// run background task, don't wait when it finishes
Task.Run(async () => {
List<Task> worker = new List<Task>();
foreach (var i in listOfData)
{
worker.Add(ProcessSingle(x));
}
await Task.WhenAll(worker);
});

// return created id immediately
return id;
}

But then here is another problem, in this case those tasks still start all together, eating your CPU-usage.

So to avoid this, use SemaphoreSlim

public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB

// run background task, don't wait when it finishes
Task.Run(async () => {
List<Task> worker = new List<Task>();
//To limit the number of Task started.
var throttler = new SemaphoreSlim(initialCount: 20);
foreach (var i in listOfData)
{
await throttler.WaitAsync();
worker.Add(Task.Run(async () =>
{
await ProcessSingle(x);
throttler.Release();
}
));
}
await Task.WhenAll(worker);
});

// return created id immediately
return id;
}

Read more How to limit the amount of concurrent async I/O operations?.

Also, do not use Task.Factory.StartNew() when simple Task.Run() is enough to do work you want, read this excellent article by Stephen Cleary.

Limiting the number of concurrent System.Threading.Tasks.Task

The design-time issue is that the number of concurrent Tasks (and thus memory footprint) are going to explode because tasks are fed faster than they complete for big data files. I was thinking about using a SemaphoreSlim

Yes, SemaphoreSlim is an appropriate choice for throttling concurrent asynchronous operations:

private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(10);

async Task ThrottledWorkAsync()
{
await _semaphore.WaitAsync();
try
{
await WorkAsync();
}
finally
{
_semaphore.Release();
}
}

However...

If instead I decide to implement my code using await/async (Entity Framework supports asynchronous programming), how can I make sure that no more than N concurrent tasks execute (i.e. go to database) at the same time?

One thing to be aware of is that Entity Framework - while it supports asynchronous APIs - still requires one connection per request. So, you can't have multiple concurrent asynchronous requests with the same DbContext; you'd need to create a separate connection for each concurrent request (or at least N connections that are "borrowed" by the concurrent requests).

How to implement a token system for limiting concurrency of a processor/IO heavy multithreading Tasks in C#?

You could use a SemaphoreSlim to limit the number of tasks that are working concurrently (you will still have taskLimit Tasks active, but only a limited number of those will be doing the heavy work simultaneously; I assume this is what you want).

This is best demonstrated with a sample console app. If you run this you'll see from the output that a maximum of 5 "heavy tasks" are active simultaneously.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
class Program
{
static async Task Main()
{
Console.WriteLine("Starting");

// Cancel after 30 seconds for demo purposes.
using var source = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await RunAsync(source.Token);

Console.WriteLine("Stopped.");
Console.ReadLine();
}

public static async Task RunAsync(CancellationToken cancellationToken)
{
int taskLimit = 20;
int concurrencyLimit = 5;

var sem = new SemaphoreSlim(concurrencyLimit);
var tasks = new List<Task>();

try
{
for (int i = 0; i < taskLimit; i++)
{
int p = i; // Prevent modified closure.
tasks.Add(Task.Run(() => MyTask(cancellationToken, p, sem)));
}

await Task.WhenAll(tasks);
}

catch (OperationCanceledException)
{
Console.WriteLine("Task(s) were cancelled.");
}

catch (Exception e)
{
// Exception Handling
}
}

public static async Task MyTask(CancellationToken cancellationToken, int a, SemaphoreSlim sem)
{
while (true)
{
cancellationToken.ThrowIfCancellationRequested();

try
{
await sem.WaitAsync(cancellationToken);

try
{
someHeavyTask(cancellationToken, a);
}

finally
{
sem.Release();
}
}

catch (OperationCanceledException)
{
Console.WriteLine("Task was cancelled.");
return;
}

catch (Exception e)
{
//Exception Handling
}
}
}

static int heavyTaskCount;

static void someHeavyTask(CancellationToken cancel, int a)
{
int n = Interlocked.Increment(ref heavyTaskCount);
Console.WriteLine("Starting heavy task. Number of simultaneous heavy tasks = " + n);

// Simulate work. Make the work for each task take varying time by using 'a' for the sleep.

for (int i = 0; i < 20 && !cancel.IsCancellationRequested; ++i)
{
Thread.Sleep(100 + a*10);
}

n = Interlocked.Decrement(ref heavyTaskCount);
Console.WriteLine("Finishing heavy task. Number of simultaneous heavy tasks = " + n);
}
}
}

The core of this is controlled by the semaphore in the code here:

await sem.WaitAsync(cancellationToken);

try
{
someHeavyTask(cancellationToken, a);
}

finally
{
sem.Release();
}

Async/Await maximum number of concurrent http requests

There are limitations on connections in the .Net framework. For example this ServicePointManager has DefaultConnectionLimit That allows you to limit concurrent operations.

However, I wouldn't recommend using it for throttling. If you need control over your requests you should use SemaphoreSlim, specifically with WaitAsync as you're in an async-await environment and these requests are IO bound (and long).

Using SemaphoreSlim.WaitAsync means that you don't block and you wait asynchronously which can't be the case for any other non-async throttling mechanism. Also, you don't want to start the operation before you are able to complete it.

How to execute multiple async calls in parallel efficiently in C#?

What you are doing is correct. You are launching a bunch of tasks all at once, and then await all of them to complete. There is no inefficiency or bottleneck regarding this specific C# code. It is a bit strange that you pass a hardcoded CancellationToken.None in the ProcessCassQuery, but it will not affect the performance. The performance of the whole operation now depends on the behavior of the Cassandra database, when it is bombarded with multiple simultaneous requests. If it is optimized for this kind of usage then everything will be OK. If not, then your current setup doesn't offer the flexibility of configuring the level of concurrency to a value optimal for the specific database engine. For ways to limit the amount of concurrent async I/O operations look here.

As a side note, according to the official guidelines the asynchronous methods ProcessCassQueries and ProcessCassQuery should have the Async suffix.



Related Topics



Leave a reply



Submit