Limit the Number of Parallel Threads in C#

Limit the number of parallel threads in C#

Assuming you're building this with the TPL, you can set the ParallelOptions.MaxDegreesOfParallelism to whatever you want it to be.

Parallel.For for a code example.

How to limit the Maximum number of parallel tasks in c#

SemaphoreSlim is a very good solution in this case and I higly recommend OP to try this, but @Manoj's answer has flaw as mentioned in comments.semaphore should be waited before spawning the task like this.

Updated Answer: As @Vasyl pointed out Semaphore may be disposed before completion of tasks and will raise exception when Release() method is called so before exiting the using block must wait for the completion of all created Tasks.

int maxConcurrency=10;
var messages = new List<string>();
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
List<Task> tasks = new List<Task>();
foreach(var msg in messages)
{
concurrencySemaphore.Wait();

var t = Task.Factory.StartNew(() =>
{
try
{
Process(msg);
}
finally
{
concurrencySemaphore.Release();
}
});

tasks.Add(t);
}

Task.WaitAll(tasks.ToArray());
}

Answer to Comments
for those who want to see how semaphore can be disposed without Task.WaitAll
Run below code in console app and this exception will be raised.

System.ObjectDisposedException: 'The semaphore has been disposed.'

static void Main(string[] args)
{
int maxConcurrency = 5;
List<string> messages = Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();

using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
List<Task> tasks = new List<Task>();
foreach (var msg in messages)
{
concurrencySemaphore.Wait();

var t = Task.Factory.StartNew(() =>
{
try
{
Process(msg);
}
finally
{
concurrencySemaphore.Release();
}
});

tasks.Add(t);
}

// Task.WaitAll(tasks.ToArray());
}
Console.WriteLine("Exited using block");
Console.ReadKey();
}

private static void Process(string msg)
{
Thread.Sleep(2000);
Console.WriteLine(msg);
}

How to configure a maximum number of threads in a Parallel.For

You need to specify a ParallelOptions value with a MaxDegreeOfParallelism:

For example:

Parallel.For(0, 10, new ParallelOptions { MaxDegreeOfParallelism = 4 }, count =>
{
Console.WriteLine(count);
});

How can I limit Parallel.ForEach?

You can specify a MaxDegreeOfParallelism in a ParallelOptions parameter:

Parallel.ForEach(
listOfWebpages,
new ParallelOptions { MaxDegreeOfParallelism = 4 },
webpage => { Download(webpage); }
);

MSDN: Parallel.ForEach

MSDN: ParallelOptions.MaxDegreeOfParallelism

Limit number of Threads in Task Parallel Library

You should not be using threads for this at all. There's a Task-based API for this, which is naturally asynchronous: CloudBlockBlob.UploadFromFileAsync. Use it with async/await and SemaphoreSlim to throttle the number of parallel uploads.

Example (untested):

const MAX_PARALLEL_UPLOADS = 5;

async Task UploadFiles()
{
var files = new List<string>();
// ... add files to the list

// init the blob block and
// upload files asynchronously
using (var blobBlock = new CloudBlockBlob(url, credentials))
using (var semaphore = new SemaphoreSlim(MAX_PARALLEL_UPLOADS))
{
var tasks = files.Select(async(filename) =>
{
await semaphore.WaitAsync();
try
{
await blobBlock.UploadFromFileAsync(filename, FileMode.Create);
}
finally
{
semaphore.Release();
}
}).ToArray();

await Task.WhenAll(tasks);
}
}

Does Parallel.ForEach limit the number of active threads?

No, it won't start 1000 threads - yes, it will limit how many threads are used. Parallel Extensions uses an appropriate number of cores, based on how many you physically have and how many are already busy. It allocates work for each core and then uses a technique called work stealing to let each thread process its own queue efficiently and only need to do any expensive cross-thread access when it really needs to.

Have a look at the PFX Team Blog for loads of information about how it allocates work and all kinds of other topics.

Note that in some cases you can specify the degree of parallelism you want, too.

How to limit number of parallel tasks running?

There's a couple of problems with the code:

  1. The task returned from LimitEncryptionSemaphore.WaitAsync is ignored, instead of being awaited.
  2. LimitEncryptionSemaphore.WaitAsync is called in a different method than LimitEncryptionSemaphore.Release.

Fix:

private void WatcherOnCreated(string filePath, WatcherChangeTypes changeType)
{
string fileName = Path.GetFileName(filePath);
Logger.Debug($"A created item {fileName} was detected in drop folder.");

TaskFactory.FireAndForget(async () =>
{
await LimitEncryptionSemaphore.WaitAsync();
try
{
...
}
catch (Exception ex)
{
...
}
finally
{
LimitEncryptionSemaphore.Release();
}
});
}

limit number of threads C#

I would suggest you change your entire code to use the Parallel class:

https://msdn.microsoft.com/en-us/library/dd781401(v=vs.110).aspx

// Options to the set maximum number of threads. 
// This is not necessary, .NET will try to use the best amount of cores there is. (as pointed out by Panagiotis Kanavos)
// Overload of method is available without the options parameter
var options = new ParallelOptions()
{
MaxDegreeOfParallelism = 4 // maximum number of threads
};

const int batchSize = 1000;
int totalSize = 45856582;
int loops = totalSize / batchSize;

// if there is a rest in the division (in this case 582)
// then add an extra loop
if(totalSize % batchSize != 0)
loops++;

// The body on the loop
Action<int> loopBody = i =>
{
Worker temp = new Worker(i * batchSize, i * batchSize + batchSize);
temp.DoWork();
};

// The result. Check for loopResult.IsCompleted if everything went correctly
var loopResult = Parallel.For(0, loops, options, loopBody);

Or you could use an overload of the method to support cancelling etc.

C# TPL for loop - limit number of threads

The MaxDegreeOfParallelism sets the maximum number of simultaneous threads that will be used for the Parallel.For(). It does not mean that only two threads will ever be used.

Different threads can be allocated from the threadpool during execution of the Parallel.For(), since threadpool threads are specifically designed to be reused.

The following program demonstrates. If you run it, you'll see that the total number of different threads being used can exceed 2, but the total number of threads being used simultaneously never exceeds 2.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
class Program
{
static void Main()
{
ParallelOptions po = new ParallelOptions
{
MaxDegreeOfParallelism = 2
};

var activeThreads = new ConcurrentDictionary<int, bool>();

Parallel.For(0, 100, po, x =>
{
activeThreads[Thread.CurrentThread.ManagedThreadId] = true;
Console.WriteLine("Active threads: " + string.Join(", ", activeThreads.Keys));
Thread.Sleep(200);
activeThreads.TryRemove(Thread.CurrentThread.ManagedThreadId, out bool unused);
});

Console.ReadLine();
}
}
}


Related Topics



Leave a reply



Submit