Nesting await in Parallel.ForEach
The whole idea behind Parallel.ForEach()
is that you have a set of threads and each thread processes part of the collection. As you noticed, this doesn't work with async
-await
, where you want to release the thread for the duration of the async call.
You could “fix” that by blocking the ForEach()
threads, but that defeats the whole point of async
-await
.
What you could do is to use TPL Dataflow instead of Parallel.ForEach()
, which supports asynchronous Task
s well.
Specifically, your code could be written using a TransformBlock
that transforms each id into a Customer
using the async
lambda. This block can be configured to execute in parallel. You would link that block to an ActionBlock
that writes each Customer
to the console.
After you set up the block network, you can Post()
each id to the TransformBlock
.
In code:
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
Although you probably want to limit the parallelism of the TransformBlock
to some small constant. Also, you could limit the capacity of the TransformBlock
and add the items to it asynchronously using SendAsync()
, for example if the collection is too big.
As an added benefit when compared to your code (if it worked) is that the writing will start as soon as a single item is finished, and not wait until all of the processing is finished.
Parallel foreach with asynchronous lambda
If you just want simple parallelism, you can do this:
var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;
If you need something more complex, check out Stephen Toub's ForEachAsync
post.
Nested async methods in a Parallel.ForEach
You can't mix async
with Parallel.ForEach
. Since your underlying operation is asynchronous, you'd want to use asynchronous concurrency, not parallelism. Asynchronous concurrency is most easily expressed with WhenAll
:
var listOfTasks = devices.Select(ProcessDevice).ToList();
await Task.WhenAll(listOfTasks);
Multiple async-await chaining inside Parallel.ForEach
As your methods involve I/O, they should be written to be truly asynchronous, not just synchronously ran on the threadpool using Task.Run
.
Then you could use Task.WhenAll
in combination with Enumerable.Select
:
var tasks = someCollection.Select(async item =>
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);
return (country, state, calculation);
});
foreach (var tuple in await Task.WhenAll(tasks))
{
countries.Add(tuple.country);
states.Add(tuple.state);
myCollection.AddRange(tuple.calculation);
}
This would ensure that each country
> state
> calculation
occurs sequentially, but each item
is processed concurrently, and asynchronously.
Update as per comment
using var semaphore = new SemaphoreSlim(2);
using var cts = new CancellationTokenSource();
int failures = 0;
var tasks = someCollection.Select(async item =>
{
await semaphore.WaitAsync(cts.Token);
try
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);
Interlocked.Exchange(ref failures, 0);
return (country, state, calculation);
{
catch
{
if (Interlocked.Increment(ref failures) >= 10)
{
cts.Cancel();
}
throw;
}
finally
{
semaphore.Release();
}
});
The semaphore ensures a maximum of 2 concurrent async operations, and the cancellation token will cancel all outstanding tasks after 10 consecutive exceptions.
The Interlocked
methods ensures that failures
is accessed in a thread-safe manner.
Further Update
It may be even more efficient to use 2 semaphores to prevent multiple iterations.
Encapsulate all the list-adding into a single method:
void AddToLists(Country country, State state, Calculation calculation)
{
countries.Add(country);
states.Add(state);
myCollection.AddRange(calculation);
}
Then you could allow 2 threads to simultaneously serve the Http requests, and 1 to perform the adds, making that operation thread-safe:
using var httpSemaphore = new SemaphoreSlim(2);
using var listAddSemaphore = new SemaphoreSlim(1);
using var cts = new CancellationTokenSource();
int failures = 0;
await Task.WhenAll(someCollection.Select(async item =>
{
await httpSemaphore.WaitAsync(cts.Token);
try
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);
await listAddSemaphore.WaitAsync(cts.Token);
AddToLists(country, state, calculation);
Interlocked.Exchange(ref failures, 0);
{
catch
{
if (Interlocked.Increment(ref failures) >= 10)
{
cts.Cancel();
}
throw;
}
finally
{
httpSemaphore.Release();
listAddSemaphore.Release();
}
}));
How can I gather results from parallel async tasks in C#?
I'm actually facing a problem with parallelism on asynchronous tasks.
The problem you describe is concurrency (doing more than one thing at a time) and AFAICT does not need parallelism (multiple threads).
Does anyone have a better approach to my problem?
The way to do asynchronous concurrency is to start all the tasks (commonly using a LINQ select with an asynchronous lambda), and then use Task.WhenAll
to join them.
var tasks = configurations.Select(async config => await FetchAllData(config, resultBufferBlock, cancellationToken));
await Task.WhenAll(tasks);
After some comments: I basically need Parallel.ForEachAsync which is available in .NET 6.
I disagree. Parallel.ForEachAsync
is for when you have a complex mixture of CPU-bound and I/O-bound code and need to do both parallelism and asynchronous concurrency. In this case parallelism isn't necessary.
Related Topics
How to Dynamically Generate HTML Code Using .Net'S Webbrowser or Mshtml.Htmldocument
How to Configure an App to Run Correctly on a Machine With a High Dpi Setting (E.G. 150%)
Fastest Way of Inserting in Entity Framework
How to Limit the Amount of Concurrent Async I/O Operations
How to Make Realistic N-Body Solar System Simulation in Matter of Size and Mass
Convert Generic List/Enumerable to Datatable
Difference Between Property and Field in C# 3.0+
Input String Was Not in a Correct Format
Remove Duplicates in the List Using Linq
Does C# Support Return Type Covariance
How to Strip HTML Tags from a String in Asp.Net
How to Create an Excel (.Xls and .Xlsx) File in C# Without Installing Microsoft Office
How to Save Application Settings in a Windows Forms Application
Set Object Property Using Reflection
Best Way to Randomize an Array With .Net
Mvc5 Razor Html.Dropdownlistfor Set Selected When Value Is in Array