How to Limit Parallel.Foreach

How can I limit Parallel.ForEach?

You can specify a MaxDegreeOfParallelism in a ParallelOptions parameter:

Parallel.ForEach(
listOfWebpages,
new ParallelOptions { MaxDegreeOfParallelism = 4 },
webpage => { Download(webpage); }
);

MSDN: Parallel.ForEach

MSDN: ParallelOptions.MaxDegreeOfParallelism

Is it possible to limit the cores for Parallel.ForEach?

Pass an instance of ParallelOptions with ParallelOptions.MaxDegreeOfParallelism set to 4 to Parallel.ForEach.

Nevertheless this might not make sense on other machines, that might have more or less cores than you. In general you should let the framework decide the degree of parallelism.

Does Parallel.ForEach limit the number of active threads?

No, it won't start 1000 threads - yes, it will limit how many threads are used. Parallel Extensions uses an appropriate number of cores, based on how many you physically have and how many are already busy. It allocates work for each core and then uses a technique called work stealing to let each thread process its own queue efficiently and only need to do any expensive cross-thread access when it really needs to.

Have a look at the PFX Team Blog for loads of information about how it allocates work and all kinds of other topics.

Note that in some cases you can specify the degree of parallelism you want, too.

C#: limit maximum of concurrent operation with Parallel.ForEach and async Action

Update

As I just notice you mentioned in comment, the problem is caused by math calculation.

It will be better to separate the part of calculation and updating DB.

For the calculation part, use Parallel.ForEach() so as to optimize your work and you can control the thread number.

And only after all these tasks finished. Use async-await to update your data to DB without SemaphoreSlim I mentioned.

public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB

// run background task, don't wait when it finishes
Task.Run(async () => {

//Calculation Part
ConcurrentBag<int> data = new ConcurrentBag<int>();
Parallel.ForEach(
listOfData,
new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },
x => {ConcurrentBag.Add(calculationPart(x))});

//Update DB part
int[] data_arr = data.ToArray();
List<Task> worker = new List<Task>();
foreach (var i in data_arr)
{
worker.Add(DBPart(x));
}
await Task.WhenAll(worker);
});

// return created id immediately
return id;
}

For sure they all start together as you use async-await in Parallel.forEach.

First, read about this question for both 1st and 2nd answer. Combining these two are meaningless.

Actually async-await will maximize the usage of available thread, so simply use it.

public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB

// run background task, don't wait when it finishes
Task.Run(async () => {
List<Task> worker = new List<Task>();
foreach (var i in listOfData)
{
worker.Add(ProcessSingle(x));
}
await Task.WhenAll(worker);
});

// return created id immediately
return id;
}

But then here is another problem, in this case those tasks still start all together, eating your CPU-usage.

So to avoid this, use SemaphoreSlim

public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB

// run background task, don't wait when it finishes
Task.Run(async () => {
List<Task> worker = new List<Task>();
//To limit the number of Task started.
var throttler = new SemaphoreSlim(initialCount: 20);
foreach (var i in listOfData)
{
await throttler.WaitAsync();
worker.Add(Task.Run(async () =>
{
await ProcessSingle(x);
throttler.Release();
}
));
}
await Task.WhenAll(worker);
});

// return created id immediately
return id;
}

Read more How to limit the amount of concurrent async I/O operations?.

Also, do not use Task.Factory.StartNew() when simple Task.Run() is enough to do work you want, read this excellent article by Stephen Cleary.

Is it possible to throttle Parallel.ForEachAsync in .NET 6.0 to avoid rate limiting?

My suggestion is to ditch the Parallel.ForEachAsync approach, and use instead the new Chunk LINQ operator in combination with the Task.WhenAll method. You can launch 100 asynchronous operations every second like this:

public async Task<List<TEntity>> GetEntitiesInParallelAsync<TEntity>(
List<IRestRequest> requests) where TEntity : IEntity
{
var tasks = new List<Task<TEntity>>();
foreach (var chunk in requests.Chunk(100))
{
tasks.AddRange(chunk.Select(request => GetAsync<TEntity>(request)));
await Task.Delay(TimeSpan.FromSeconds(1.0));
}
return (await Task.WhenAll(tasks)).ToList();
}

It is assumed that the time required to launch an asynchronous operation (to invoke the GetAsync method) is negligible.

This approach has the inherent disadvantage that in case of an exception, the failure will not be propagated before all operations are completed. For comparison the Parallel.ForEachAsync method stops invoking the async delegate and completes ASAP, after the first failure is detected.

Parallel.ForEach, limit simultaneous connections

Yes, that will ensure you have at most two tasks running at the same time. Assuming that no other process or task is doing any uploads to that server, you should be fine.

I would suggest to use a semaphore inside your foreach loop instead, because then the actual processing can be done with more than two tasks at the same time. This improves performance if the processing is the part that takes most time.

Count number of threads used by Parallel.ForEach

You could use a (thread-safe) list to store the IDs of the used threads and count them:

ConcurrentBag<int> threadIDs = new ConcurrentBag<int>();
Parallel.ForEach(myList, item => {
threadIDs.Add(Thread.CurrentThread.ManagedThreadId);
doStuff(item);
});

int usedThreads = threadIDs.Distinct().Count();

This does have a performance impact (especially the thread-safety logic of ConcurrentBag), but I can't tell how big that is. The relative effect depends on how much work doStuff does itself. If that method has only a few commands, this thread counting solution may even change the number of used threads.

How to limit the Maximum number of parallel tasks in c#

SemaphoreSlim is a very good solution in this case and I higly recommend OP to try this, but @Manoj's answer has flaw as mentioned in comments.semaphore should be waited before spawning the task like this.

Updated Answer: As @Vasyl pointed out Semaphore may be disposed before completion of tasks and will raise exception when Release() method is called so before exiting the using block must wait for the completion of all created Tasks.

int maxConcurrency=10;
var messages = new List<string>();
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
List<Task> tasks = new List<Task>();
foreach(var msg in messages)
{
concurrencySemaphore.Wait();

var t = Task.Factory.StartNew(() =>
{
try
{
Process(msg);
}
finally
{
concurrencySemaphore.Release();
}
});

tasks.Add(t);
}

Task.WaitAll(tasks.ToArray());
}

Answer to Comments
for those who want to see how semaphore can be disposed without Task.WaitAll
Run below code in console app and this exception will be raised.

System.ObjectDisposedException: 'The semaphore has been disposed.'

static void Main(string[] args)
{
int maxConcurrency = 5;
List<string> messages = Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();

using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
List<Task> tasks = new List<Task>();
foreach (var msg in messages)
{
concurrencySemaphore.Wait();

var t = Task.Factory.StartNew(() =>
{
try
{
Process(msg);
}
finally
{
concurrencySemaphore.Release();
}
});

tasks.Add(t);
}

// Task.WaitAll(tasks.ToArray());
}
Console.WriteLine("Exited using block");
Console.ReadKey();
}

private static void Process(string msg)
{
Thread.Sleep(2000);
Console.WriteLine(msg);
}


Related Topics



Leave a reply



Submit