Parallel foreach with asynchronous lambda
If you just want simple parallelism, you can do this:
var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;
If you need something more complex, check out Stephen Toub's ForEachAsync
post.
Parallel.ForEach with async lambda waiting forall iterations to complete
recently I have seen several SO threads related to Parallel.ForEach mixed with async lambdas, but all proposed answers were some kind of workarounds.
Well, that's because Parallel
doesn't work with async
. And from a different perspective, why would you want to mix them in the first place? They do opposite things. Parallel
is all about adding threads and async
is all about giving up threads. If you want to do asynchronous work concurrently, then use Task.WhenAll
. That's the correct tool for the job; Parallel
is not.
That said, it sounds like you want to use the wrong tool, so here's how you do it...
How can I ensure that list will contain all items from all iterations executed withing lambdas in each iteration?
You'll need to have some kind of a signal that some code can block on until the processing is done, e.g., CountdownEvent
or Monitor
. On a side note, you'll need to protect access to the non-thread-safe List<T>
as well.
How will generally Parallel.ForEach work with async lambdas, if it hit await will it hand over its thread to next iteration?
Since Parallel
doesn't understand async
lambdas, when the first await
yields (returns) to its caller, Parallel
will assume that interation of the loop is complete.
I assume ParallelLoopResult IsCompleted field is not proper one, as it will return true when all iterations are executed, no matter if their actual lambda jobs are finished or not?
Correct. As far as Parallel
knows, it can only "see" the method to the first await
that returns to its caller. So it doesn't know when the async
lambda is complete. It also will assume iterations are complete too early, which throws partitioning off.
Write an async method with Parallel.Foreach loop that does call another async method to pull record
Well for one thing you can pretend that Parallel.ForEach
awaits your async
functions, but it doesn't. Instead you want to write something like this:
await Task.WhenAll(customers.Select(async customer =>
{
var processedCustomer = await MethodB(customer);
inboundCustomersFiles.AddRange(processedCustomer);
}));
Task.WhenAll
behaves like Parallel.ForEach
, but it's awaitable and it also awaits every task you pass to it before completing its own task. Hence when your await Task.WhenAll
completes, all the inner tasks have completely completed as well.
the process in methodB customerRecord takes time
That is very ambiguous. If you mean it takes server and/or IO time then that's fine, that's what async
is for. If you mean it takes your CPU time (ie it processes data locally for a long time), then you should spin up a task on a thread pool and await its completion. Note that this is not necessarily the default thread pool! Especially if you're writing an ASP.NET (core) application, you want a dedicated thread pool just for this stuff.
Nesting await in Parallel.ForEach
The whole idea behind Parallel.ForEach()
is that you have a set of threads and each thread processes part of the collection. As you noticed, this doesn't work with async
-await
, where you want to release the thread for the duration of the async call.
You could “fix” that by blocking the ForEach()
threads, but that defeats the whole point of async
-await
.
What you could do is to use TPL Dataflow instead of Parallel.ForEach()
, which supports asynchronous Task
s well.
Specifically, your code could be written using a TransformBlock
that transforms each id into a Customer
using the async
lambda. This block can be configured to execute in parallel. You would link that block to an ActionBlock
that writes each Customer
to the console.
After you set up the block network, you can Post()
each id to the TransformBlock
.
In code:
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
Although you probably want to limit the parallelism of the TransformBlock
to some small constant. Also, you could limit the capacity of the TransformBlock
and add the items to it asynchronously using SendAsync()
, for example if the collection is too big.
As an added benefit when compared to your code (if it worked) is that the writing will start as soon as a single item is finished, and not wait until all of the processing is finished.
Passing async method into Parallel.ForEach
This code works only because DoAsyncJob
isn't really an asynchronous method. async
doesn't make a method work asynchronously. Awaiting a completed task like that returned by Task.FromResult
is synchronous too. async Task Main
doesn't contain any asynchronous code, which results in a compiler warning.
An example that demonstrates how Parallel.ForEach
doesn't work with asynchronous methods should call a real asynchronous method:
static async Task Main(string[] args)
{
var results = new ConcurrentDictionary<string, int>();
Parallel.ForEach(Enumerable.Range(0, 100), async index =>
{
var res = await DoAsyncJob(index);
results.TryAdd(index.ToString(), res);
});
Console.WriteLine($"Items in dictionary {results.Count}");
}
static async Task<int> DoAsyncJob(int i)
{
await Task.Delay(100);
return i * 10;
}
The result will be
Items in dictionary 0
Parallel.ForEach has no overload accepting a Func<Task>
, it accepts only Action
delegates. This means it can't await any asynchronous operations.
async index
is accepted because it's implicitly an async void
delegate. As far as Parallel.ForEach
is concerned, it's just an Action<int>
.
The result is that Parallel.ForEach
fires off 100 tasks and never waits for them to complete. That's why the dictionary is still empty when the application terminates.
How can I gather results from parallel async tasks in C#?
I'm actually facing a problem with parallelism on asynchronous tasks.
The problem you describe is concurrency (doing more than one thing at a time) and AFAICT does not need parallelism (multiple threads).
Does anyone have a better approach to my problem?
The way to do asynchronous concurrency is to start all the tasks (commonly using a LINQ select with an asynchronous lambda), and then use Task.WhenAll
to join them.
var tasks = configurations.Select(async config => await FetchAllData(config, resultBufferBlock, cancellationToken));
await Task.WhenAll(tasks);
After some comments: I basically need Parallel.ForEachAsync which is available in .NET 6.
I disagree. Parallel.ForEachAsync
is for when you have a complex mixture of CPU-bound and I/O-bound code and need to do both parallelism and asynchronous concurrency. In this case parallelism isn't necessary.
Parallel loop containing both async and synchronous
Is there any way to get execute the action in parallel when the action has to be async in order to await the single async call.
Yes, but you'll need to understand what Parallel
gives you that you lose when you take alternative approaches. Specifically, Parallel
will automatically determine the appropriate number of threads and adjust based on usage.
It's not a practical option to convert the synchronous functions to async.
For CPU-bound methods, you shouldn't convert them.
I don't want to split the action up and have a Parallel.ForEach followed by the async calls with a WhenAll and another Parallel.ForEach as the speed of each stage can vary greatly between different iterations so splitting it would be inefficient as the faster ones would be waiting for the slower ones before continuing.
The first recommendation I would make is to look into TPL Dataflow. It allows you to define a "pipeline" of sorts that keeps the data flowing through while limiting the concurrency at each stage.
I did wonder if a PLINQ ForAll could be used instead of the Parallel.ForEach
No. PLINQ is very similar to Parallel
in how they work. There's a few differences over how aggressive they are at CPU utilization, and some API differences - e.g., if you have a collection of results coming out the end, PLINQ is usually cleaner than Parallel
- but at a high-level view they're very similar. Both only work on synchronous code.
However, you could use a simple Task.Run
with Task.WhenAll
as such:
protected async Task<IDictionary<int, ReportData>> GetReportDataAsync()
{
var tasks = requestData.Select(async data => Task.Run(() =>
{
// process data synchronously
var processedData = ProcessData(data);
// get some data async
var reportRequest = await BuildRequestAsync(processedData);
// synchronous building
var report = reportRequest.BuildReport();
return (Key: data.ReportId, Value: report);
})).ToList();
var results = await Task.WhenAll(tasks);
return results.ToDictionary(x => x.Key, x => x.Value);
}
You may need to apply a concurrency limit (which Parallel
would have done for you). In the asynchronous world, this would look like:
protected async Task<IDictionary<int, ReportData>> GetReportDataAsync()
{
var throttle = new SemaphoreSlim(10);
var tasks = requestData.Select(data => Task.Run(async () =>
{
await throttle.WaitAsync();
try
{
// process data synchronously
var processedData = ProcessData(data);
// get some data async
var reportRequest = await BuildRequestAsync(processedData);
// synchronous building
var report = reportRequest.BuildReport();
return (Key: data.ReportId, Value: report);
}
finally
{
throttle.Release();
}
})).ToList();
var results = await Task.WhenAll(tasks);
return results.ToDictionary(x => x.Key, x => x.Value);
}
Related Topics
Best Way to Randomize an Array With .Net
Type Checking: Typeof, Gettype, or Is
How to Find the Method That Called the Current Method
Use of Finalize/Dispose Method in C#
Using .Net, How to Find the Mime Type of a File Based on the File Signature Not the Extension
Is There a Reason For C#'S Reuse of the Variable in a Foreach
How to Convert a Column Number (E.G. 127) into an Excel Column (E.G. Aa)
Getting Attributes of Enum'S Value
How to Pass an Array into a SQL Server Stored Procedure
Evaluating String "3*(4+2)" Yield Int 18
How to Provide User Name and Password When Connecting to a Network Share
How to Detect the Encoding/Codepage of a Text File
An Existing Connection Was Forcibly Closed by the Remote Host
Get Name of a Variable or Parameter
Addtransient, Addscoped and Addsingleton Services Differences