Nesting Await in Parallel.Foreach

Nesting await in Parallel.ForEach

The whole idea behind Parallel.ForEach() is that you have a set of threads and each thread processes part of the collection. As you noticed, this doesn't work with async-await, where you want to release the thread for the duration of the async call.

You could “fix” that by blocking the ForEach() threads, but that defeats the whole point of async-await.

What you could do is to use TPL Dataflow instead of Parallel.ForEach(), which supports asynchronous Tasks well.

Specifically, your code could be written using a TransformBlock that transforms each id into a Customer using the async lambda. This block can be configured to execute in parallel. You would link that block to an ActionBlock that writes each Customer to the console.
After you set up the block network, you can Post() each id to the TransformBlock.

In code:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});

foreach (var id in ids)
getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

Although you probably want to limit the parallelism of the TransformBlock to some small constant. Also, you could limit the capacity of the TransformBlock and add the items to it asynchronously using SendAsync(), for example if the collection is too big.

As an added benefit when compared to your code (if it worked) is that the writing will start as soon as a single item is finished, and not wait until all of the processing is finished.

Parallel foreach with asynchronous lambda

If you just want simple parallelism, you can do this:

var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;

If you need something more complex, check out Stephen Toub's ForEachAsync post.

Nested async methods in a Parallel.ForEach

You can't mix async with Parallel.ForEach. Since your underlying operation is asynchronous, you'd want to use asynchronous concurrency, not parallelism. Asynchronous concurrency is most easily expressed with WhenAll:

var listOfTasks = devices.Select(ProcessDevice).ToList();
await Task.WhenAll(listOfTasks);

Multiple async-await chaining inside Parallel.ForEach

As your methods involve I/O, they should be written to be truly asynchronous, not just synchronously ran on the threadpool using Task.Run.

Then you could use Task.WhenAll in combination with Enumerable.Select:

var tasks = someCollection.Select(async item =>
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);

return (country, state, calculation);
});

foreach (var tuple in await Task.WhenAll(tasks))
{
countries.Add(tuple.country);
states.Add(tuple.state);
myCollection.AddRange(tuple.calculation);
}

This would ensure that each country > state > calculation occurs sequentially, but each item is processed concurrently, and asynchronously.


Update as per comment

using var semaphore = new SemaphoreSlim(2);
using var cts = new CancellationTokenSource();

int failures = 0;

var tasks = someCollection.Select(async item =>
{
await semaphore.WaitAsync(cts.Token);

try
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);

Interlocked.Exchange(ref failures, 0);

return (country, state, calculation);
{
catch
{
if (Interlocked.Increment(ref failures) >= 10)
{
cts.Cancel();
}
throw;
}
finally
{
semaphore.Release();
}
});

The semaphore ensures a maximum of 2 concurrent async operations, and the cancellation token will cancel all outstanding tasks after 10 consecutive exceptions.

The Interlocked methods ensures that failures is accessed in a thread-safe manner.


Further Update

It may be even more efficient to use 2 semaphores to prevent multiple iterations.

Encapsulate all the list-adding into a single method:

void AddToLists(Country country, State state, Calculation calculation)
{
countries.Add(country);
states.Add(state);
myCollection.AddRange(calculation);
}

Then you could allow 2 threads to simultaneously serve the Http requests, and 1 to perform the adds, making that operation thread-safe:

using var httpSemaphore = new SemaphoreSlim(2);
using var listAddSemaphore = new SemaphoreSlim(1);
using var cts = new CancellationTokenSource();

int failures = 0;

await Task.WhenAll(someCollection.Select(async item =>
{
await httpSemaphore.WaitAsync(cts.Token);

try
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);

await listAddSemaphore.WaitAsync(cts.Token);
AddToLists(country, state, calculation);

Interlocked.Exchange(ref failures, 0);
{
catch
{
if (Interlocked.Increment(ref failures) >= 10)
{
cts.Cancel();
}
throw;
}
finally
{
httpSemaphore.Release();
listAddSemaphore.Release();
}
}));

How can I gather results from parallel async tasks in C#?

I'm actually facing a problem with parallelism on asynchronous tasks.

The problem you describe is concurrency (doing more than one thing at a time) and AFAICT does not need parallelism (multiple threads).

Does anyone have a better approach to my problem?

The way to do asynchronous concurrency is to start all the tasks (commonly using a LINQ select with an asynchronous lambda), and then use Task.WhenAll to join them.

var tasks = configurations.Select(async config => await FetchAllData(config, resultBufferBlock, cancellationToken));
await Task.WhenAll(tasks);

After some comments: I basically need Parallel.ForEachAsync which is available in .NET 6.

I disagree. Parallel.ForEachAsync is for when you have a complex mixture of CPU-bound and I/O-bound code and need to do both parallelism and asynchronous concurrency. In this case parallelism isn't necessary.



Related Topics



Leave a reply



Submit