Parallel Foreach With Asynchronous Lambda

Parallel foreach with asynchronous lambda

If you just want simple parallelism, you can do this:

var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;

If you need something more complex, check out Stephen Toub's ForEachAsync post.

Parallel.ForEach with async lambda waiting forall iterations to complete

recently I have seen several SO threads related to Parallel.ForEach mixed with async lambdas, but all proposed answers were some kind of workarounds.

Well, that's because Parallel doesn't work with async. And from a different perspective, why would you want to mix them in the first place? They do opposite things. Parallel is all about adding threads and async is all about giving up threads. If you want to do asynchronous work concurrently, then use Task.WhenAll. That's the correct tool for the job; Parallel is not.

That said, it sounds like you want to use the wrong tool, so here's how you do it...

How can I ensure that list will contain all items from all iterations executed withing lambdas in each iteration?

You'll need to have some kind of a signal that some code can block on until the processing is done, e.g., CountdownEvent or Monitor. On a side note, you'll need to protect access to the non-thread-safe List<T> as well.

How will generally Parallel.ForEach work with async lambdas, if it hit await will it hand over its thread to next iteration?

Since Parallel doesn't understand async lambdas, when the first await yields (returns) to its caller, Parallel will assume that interation of the loop is complete.

I assume ParallelLoopResult IsCompleted field is not proper one, as it will return true when all iterations are executed, no matter if their actual lambda jobs are finished or not?

Correct. As far as Parallel knows, it can only "see" the method to the first await that returns to its caller. So it doesn't know when the async lambda is complete. It also will assume iterations are complete too early, which throws partitioning off.

Write an async method with Parallel.Foreach loop that does call another async method to pull record

Well for one thing you can pretend that Parallel.ForEach awaits your async functions, but it doesn't. Instead you want to write something like this:

   await Task.WhenAll(customers.Select(async customer =>
{
var processedCustomer = await MethodB(customer);
inboundCustomersFiles.AddRange(processedCustomer);
}));

Task.WhenAll behaves like Parallel.ForEach, but it's awaitable and it also awaits every task you pass to it before completing its own task. Hence when your await Task.WhenAll completes, all the inner tasks have completely completed as well.

the process in methodB customerRecord takes time

That is very ambiguous. If you mean it takes server and/or IO time then that's fine, that's what async is for. If you mean it takes your CPU time (ie it processes data locally for a long time), then you should spin up a task on a thread pool and await its completion. Note that this is not necessarily the default thread pool! Especially if you're writing an ASP.NET (core) application, you want a dedicated thread pool just for this stuff.

Nesting await in Parallel.ForEach

The whole idea behind Parallel.ForEach() is that you have a set of threads and each thread processes part of the collection. As you noticed, this doesn't work with async-await, where you want to release the thread for the duration of the async call.

You could “fix” that by blocking the ForEach() threads, but that defeats the whole point of async-await.

What you could do is to use TPL Dataflow instead of Parallel.ForEach(), which supports asynchronous Tasks well.

Specifically, your code could be written using a TransformBlock that transforms each id into a Customer using the async lambda. This block can be configured to execute in parallel. You would link that block to an ActionBlock that writes each Customer to the console.
After you set up the block network, you can Post() each id to the TransformBlock.

In code:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});

foreach (var id in ids)
getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

Although you probably want to limit the parallelism of the TransformBlock to some small constant. Also, you could limit the capacity of the TransformBlock and add the items to it asynchronously using SendAsync(), for example if the collection is too big.

As an added benefit when compared to your code (if it worked) is that the writing will start as soon as a single item is finished, and not wait until all of the processing is finished.

Passing async method into Parallel.ForEach

This code works only because DoAsyncJob isn't really an asynchronous method. async doesn't make a method work asynchronously. Awaiting a completed task like that returned by Task.FromResult is synchronous too. async Task Main doesn't contain any asynchronous code, which results in a compiler warning.

An example that demonstrates how Parallel.ForEach doesn't work with asynchronous methods should call a real asynchronous method:

    static async Task Main(string[] args)
{
var results = new ConcurrentDictionary<string, int>();

Parallel.ForEach(Enumerable.Range(0, 100), async index =>
{
var res = await DoAsyncJob(index);
results.TryAdd(index.ToString(), res);
});
Console.WriteLine($"Items in dictionary {results.Count}");
}

static async Task<int> DoAsyncJob(int i)
{
await Task.Delay(100);
return i * 10;
}

The result will be

Items in dictionary 0

Parallel.ForEach has no overload accepting a Func<Task>, it accepts only Action delegates. This means it can't await any asynchronous operations.

async index is accepted because it's implicitly an async void delegate. As far as Parallel.ForEach is concerned, it's just an Action<int>.

The result is that Parallel.ForEach fires off 100 tasks and never waits for them to complete. That's why the dictionary is still empty when the application terminates.

How can I gather results from parallel async tasks in C#?

I'm actually facing a problem with parallelism on asynchronous tasks.

The problem you describe is concurrency (doing more than one thing at a time) and AFAICT does not need parallelism (multiple threads).

Does anyone have a better approach to my problem?

The way to do asynchronous concurrency is to start all the tasks (commonly using a LINQ select with an asynchronous lambda), and then use Task.WhenAll to join them.

var tasks = configurations.Select(async config => await FetchAllData(config, resultBufferBlock, cancellationToken));
await Task.WhenAll(tasks);

After some comments: I basically need Parallel.ForEachAsync which is available in .NET 6.

I disagree. Parallel.ForEachAsync is for when you have a complex mixture of CPU-bound and I/O-bound code and need to do both parallelism and asynchronous concurrency. In this case parallelism isn't necessary.

Parallel loop containing both async and synchronous

Is there any way to get execute the action in parallel when the action has to be async in order to await the single async call.

Yes, but you'll need to understand what Parallel gives you that you lose when you take alternative approaches. Specifically, Parallel will automatically determine the appropriate number of threads and adjust based on usage.

It's not a practical option to convert the synchronous functions to async.

For CPU-bound methods, you shouldn't convert them.

I don't want to split the action up and have a Parallel.ForEach followed by the async calls with a WhenAll and another Parallel.ForEach as the speed of each stage can vary greatly between different iterations so splitting it would be inefficient as the faster ones would be waiting for the slower ones before continuing.

The first recommendation I would make is to look into TPL Dataflow. It allows you to define a "pipeline" of sorts that keeps the data flowing through while limiting the concurrency at each stage.

I did wonder if a PLINQ ForAll could be used instead of the Parallel.ForEach

No. PLINQ is very similar to Parallel in how they work. There's a few differences over how aggressive they are at CPU utilization, and some API differences - e.g., if you have a collection of results coming out the end, PLINQ is usually cleaner than Parallel - but at a high-level view they're very similar. Both only work on synchronous code.

However, you could use a simple Task.Run with Task.WhenAll as such:

protected async Task<IDictionary<int, ReportData>> GetReportDataAsync()
{
var tasks = requestData.Select(async data => Task.Run(() =>
{
// process data synchronously
var processedData = ProcessData(data);

// get some data async
var reportRequest = await BuildRequestAsync(processedData);

// synchronous building
var report = reportRequest.BuildReport();

return (Key: data.ReportId, Value: report);
})).ToList();
var results = await Task.WhenAll(tasks);
return results.ToDictionary(x => x.Key, x => x.Value);
}

You may need to apply a concurrency limit (which Parallel would have done for you). In the asynchronous world, this would look like:

protected async Task<IDictionary<int, ReportData>> GetReportDataAsync()
{
var throttle = new SemaphoreSlim(10);
var tasks = requestData.Select(data => Task.Run(async () =>
{
await throttle.WaitAsync();
try
{
// process data synchronously
var processedData = ProcessData(data);

// get some data async
var reportRequest = await BuildRequestAsync(processedData);

// synchronous building
var report = reportRequest.BuildReport();

return (Key: data.ReportId, Value: report);
}
finally
{
throttle.Release();
}
})).ToList();
var results = await Task.WhenAll(tasks);
return results.ToDictionary(x => x.Key, x => x.Value);
}


Related Topics



Leave a reply



Submit