How to Loop Through Ienumerable in Batches

How to loop through IEnumerable in batches

Sounds like you need to use Skip and Take methods of your object. Example:

users.Skip(1000).Take(1000)

this would skip the first 1000 and take the next 1000. You'd just need to increase the amount skipped with each call

You could use an integer variable with the parameter for Skip and you can adjust how much is skipped. You can then call it in a method.

public IEnumerable<user> GetBatch(int pageNumber)
{
return users.Skip(pageNumber * 1000).Take(1000);
}

How to process IEnumerable in batches?

Using the batch solution from this thread it seems trivial:

const int batchSize = 100;

foreach (var batch in contacts.Batch(batchSize))
{
DoSomething(batch);
}

If you want to also wrap it up:

public static void ProcessInBatches<TSource>(
this IEnumerable<TSource> source,
int batchSize,
Action<IEnumerable<TSource>> action)
{
foreach (var batch in source.Batch(batchSize))
{
action(batch);
}
}

So, your code can be transformed into:

const int batchSize = 100;

contacts.ProcessInBatches(batchSize, DoSomething);

Create batches in LINQ

An Enumerable.Chunk() extension method was added to .NET 6.0.

Example:

var list = new List<int> { 1, 2, 3, 4, 5, 6, 7 };

var chunks = list.Chunk(3);
// returns { { 1, 2, 3 }, { 4, 5, 6 }, { 7 } }

For those who cannot upgrade, the source is available on GitHub.

IEnumerable extension to pull results in batches

Firstly, thank you Arturo. You set me on the right track for this solution. I went in figuring it was a Linq->Entity issue, but these problems are still far from intuitive for me to resolve.

Second, I borrowed heavily from Shimmy's answer to this question. Thanks Shimmy!

First, I updated the method to support key types other than integers, because why not. So the method signature is now (note the change to IQueryable source):

public static IEnumerable<T> BatchedSelector<T, TKey>(this IQueryable<T> source, Expression<Func<T, TKey>> selector, IEnumerable<TKey> keys, int maxBatchSize)

The method stayed substantially the same other than the line that was producing errors, which is now replaced with:

resultList = source.WhereIn(selector, keyBatch).ToList();

WhereIn is a Linq extension mostly borrowed from Shimmy:

    public static IQueryable<T> WhereIn<T, TKey>(this IQueryable<T> source, Expression<Func<T, TKey>> selector, IEnumerable<TKey> keyCollection)
{
if (selector == null) throw new ArgumentNullException("Null selector");
if (keyCollection == null) throw new ArgumentNullException("Null collection");

//if no items in collection, no results
if (!keyCollection.Any()) return source.Where(t => false);

//assemble expression
var p = selector.Parameters.Single();
var equals = keyCollection.Select(value => (Expression)Expression.Equal(selector.Body, Expression.Constant(value, typeof(TKey))));
var body = equals.Aggregate((accumulate, equal) => Expression.Or(accumulate, equal));

//return expression
return source.Where(Expression.Lambda<Func<T, bool>>(body, p));
}

This taught me something pretty cool: if you feed a where clause of a bunch of constant comparisons, it will be converted to a SQL In statement! Neat!

With those changes, the method produces results quickly and easily.

Looping through a List T in batches while ensuring items per batch are unique

If I completely understand the problem, then there are many ways to do this and the best solution would depend on your actual needs.

The assumptions are :

  1. What you have described is an in memory approach
  2. It doesn't need to hit a database
  3. It doesn't need to be producer consumer.

Then a very simple (yet efficient) batch and queue pattern can be used with minimal allocations.

Given

public class Payment
{
public int AccountId { get; set; }
public Payment(int accountId) => AccountId = accountId;
}

And

public static IEnumerable<Payment[]> GetBatches(IEnumerable<Payment> source, int count)
{
var hashset = new HashSet<int>(count);
var batch = new List<Payment>(count);
var leftOvers = new Queue<Payment>();

while (true)
{
foreach (var item in source)
{
// check if batched
if (hashset.Add(item.AccountId))
batch.Add(item); // add to batch
else
leftOvers.Enqueue(item); // add to left overs

// if we are at the batch size start a loop
while (batch.Count == count)
{
yield return batch.ToArray(); // return the batch

batch.Clear();
hashset.Clear();

// check the left overs
while (leftOvers.Any() && batch.Count != count)
if (hashset.Add(leftOvers.Peek().AccountId)) // check the batch
batch.Add(leftOvers.Dequeue());
else break; // we still have a duplicate bail
}
}

if(batch.Any()) yield return batch.ToArray();

if (!leftOvers.Any()) break;

source = leftOvers.ToList(); // allocation :(
hashset.Clear();
batch.Clear();
leftOvers.Clear();
}
}

Note : This is fairly resource efficient, though it does probably have an extra unnecessary small allocation when dealing with pure leftovers, I am sure this could be removed, though I'll leave that up to you. There are also many efficiencies you could add with the use of a channel could easily be turned into a consumer

Test

var list = new List<Payment>() {new(1), new(2), new(3), new(4), new(4), new(5), new(6), new(4), new(4), new(6), new(4)};

var batches = GetBatches(list, 3);

foreach (var batch in batches)
Console.WriteLine(string.Join(", ",batch.Select(x => x.AccountId)));

Output

1, 2, 3
4, 5, 6
4, 6
4
4
4

Full demo here to Play with

How to break big foreach loop into batch and process it simultaneously?

This is a great case for a Parallel.ForEach Loop, which will automatically distribute your loop processing across multiple threads. Very easy to re-arrange your code into such a loop and utilize the built-in Parallel library to enable parallel processing. This assumes of course that sequence doesn't really matter (and it doesn't seem to based on what little we can see).

EDIT:
If you do need specific batches of 100 or 200 as noted in your comment, you can use the System.Collections.Concurrent.Partitioner class to break a parallel loop up as desired, this SO post actually does a good job describing how to use it.

Parallel.ForEach(devices, (device) =>
{
// register device into IoT hub
Device device;
RegistryManager registryManager = RegistryManager.CreateFromConnectionString("connectionString");
device = await registryManager.AddDeviceAsync(new Device(deviceId));

// send message to iot hub
DeviceClient deviceClient;
await deviceClient.SendEventAsync("data");

});

Linq Select 5 items per Iteration


for (int i=0; i < 20 ; i++)
{
var fiveitems = theList.Skip(i*5).Take(5);
}

Is there a way to organise an IEnumerable into batches in column-major format using Linq?


int[] arr = new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
int i=0;
var result = arr.GroupBy(x => i++ % 3).Select(g => g.ToList()).ToList();

Select equal chunks from IEnumerable Range for Parallel For loop


I would like to divide this list of numbers up into chunks based on the processor count

There are many possible implementations for a LINQ Batch method.

How do you select out the inner ranges in less code, without having to know how many tuples I will need?

Here's one way to handle that:

var batchRanges = from batch in allRateSteps.Batch(anyNumberGoesHere)
let first = batch.First()
let last = batch.Last()
select Tuple.Create(first, last);

(0.0725, 0.0795275590551181)
(0.0805314960629921, 0.0875590551181102)
(0.0885629921259842, 0.0955905511811024)
...

how do I evenly distribute the work across all processors based on the ranges I've defined in tupleList

This part of your example doesn't reference tupleList so it's hard to see the desired behavior.

Thread1 needs to handle the ranges in the first tuple, thread2 handles the ranged defined in the second tuple, etc...

Unless you have some hard requirement that certain threads process certain batches, I would strongly suggest generating your work as a single "stream" and using a higher-level abstraction for parallelism e.g. PLINQ.

If you just want to do work in batches, you can still do that but not care about which thread(s) the work is being done on:

static void Work(IEnumerable<int> ints) {
var sum = ints.Sum();
Thread.Sleep(sum);
Console.WriteLine(ints.Sum());
}

public static void Main (string[] args) {
var inputs = from i in Enumerable.Range(0, 100)
select i + i;
var batches = inputs.Batch(8);
var tasks = from batch in batches
select Task.Run(() => Work(batch));
Task.WaitAll(tasks.ToArray());
}

The default TaskScheduler is coordinating the work for you behind the scenes, and it'll likely outperform hand-rolling your own threading scheme.

Also consider something like this:

static int Work(IEnumerable<int> ints) {
Console.WriteLine("Work on thread " + Thread.CurrentThread.ManagedThreadId);
var sum = ints.Sum();
Thread.Sleep(sum);
return sum;
}

public static void Main (string[] args) {
var inputs = from i in Enumerable.Range(0, 100)
select i + i;
var batches = inputs.Batch(8);
var tasks = from batch in batches
select Work(batch);
foreach (var task in tasks.AsParallel()) {
Console.WriteLine(task);
}
}

/*
Work on thread 6
Work on thread 4
56
Work on thread 4
184
Work on thread 4
Work on thread 4
312
440
...
*/

How to safely iterate over an IAsyncEnumerable to send a collection downstream for message processing in batches


I'm also hesitant to use a List variable and just do a length-check on that as sharing that data across threads doesn't seem very thread-safe.

You need to think in terms of execution flows, not threads, when dealing with async; since you are await-ing the processing step, there isn't actually a concurrency problem accessing the list, because regardless of which threads are used: the list is only accessed once at a time.

If you are still concerned, you could new a list per batch, but that is probably overkill. What you do need, however, is two additions - a reset between batches, and a final processing step:

var listWithPreConfiguredNumberOfElements = new List<YourType>(preConfiguredNumber);
await foreach (var data in ProcessBlob(downloadedFile)) // CAF?
{
listWithPreConfiguredNumberOfElements.Add(data);
if (listWithPreConfiguredNumberOfElements.Count == preConfiguredNumber)
{
await _messageHandler.Handle(listWithPreConfiguredNumberOfElements); // CAF?
listWithPreConfiguredNumberOfElements.Clear(); // reset for a new batch
// (replace this with a "new" if you're still concerned about concurrency)
}
}
if (listWithPreConfiguredNumberOfElements.Any())
{ // process any stragglers
await _messageHandler.Handle(listWithPreConfiguredNumberOfElements); // CAF?
}

You might also choose to use ConfigureAwait(false) in the three spots marked // CAF?



Related Topics



Leave a reply



Submit