C# Producer/Consumer

TPL Producer Consumer in a FIFO order C#

OK after the edit - instead of adding the results in the BlockingCollection, add the Tasks in the blocking collection. This has the feature where the items are processed in order AND there is a maximum parallelism which will prevent too many threads from kicking off and you eating up all your memory.

https://dotnetfiddle.net/lUbSqB

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;

public class Program
{
private static BlockingCollection<Task<int>> BlockingCollection {get;set;}

public static void Producer(int numTasks)
{
Random r = new Random(7);
for(int i = 0 ; i < numTasks ; i++)
{
int closured = i;
Task<int> task = new Task<int>(()=>
{
Thread.Sleep(r.Next(100));
Console.WriteLine("Produced: " + closured);
return closured;
});
BlockingCollection.Add(task);
task.Start();
}
BlockingCollection.CompleteAdding();
}


public static void Main()
{
int numTasks = 20;
int maxParallelism = 3;

BlockingCollection = new BlockingCollection<Task<int>>(maxParallelism);

Task.Factory.StartNew(()=> Producer(numTasks));

foreach(var task in BlockingCollection.GetConsumingEnumerable())
{
task.Wait();
Console.WriteLine(" Consumed: "+ task.Result);
task.Dispose();
}

}
}

And the results:

Produced: 0
Consumed: 0
Produced: 1
Consumed: 1
Produced: 3
Produced: 2
Consumed: 2
Consumed: 3
Produced: 4
Consumed: 4
Produced: 6
Produced: 5
Consumed: 5
Consumed: 6
Produced: 7
Consumed: 7
Produced: 8
Consumed: 8
Produced: 10
Produced: 9
Consumed: 9
Consumed: 10
Produced: 12
Produced: 13
Produced: 11
Consumed: 11
Consumed: 12
Consumed: 13
Produced: 15
Produced: 14
Consumed: 14
Consumed: 15
Produced: 17
Produced: 16
Produced: 18
Consumed: 16
Consumed: 17
Consumed: 18
Produced: 19
Consumed: 19

Producer producing data every one second and Consumer consuming after every minute

In this example two tasks are created: producer and consumer. In the first, data is generated every second and placed in the collection. In the second, the data is extracted from the collection and processed every minute.

var produced = new BlockingCollection<Price>();

var producer = Task.Run(async () =>
{
try
{
var random = new Random();
while (true)
{
produced.Add(new Price { Low = random.Next(500), High = random.Next(500, 1000) });
await Task.Delay(1000);
}
}
finally
{
produced.CompleteAdding();
}
});

var consumer = Task.Run(async () =>
{
const int interval = 60; // seconds
var values = new List<Price>();

foreach (var value in produced.GetConsumingEnumerable())
{
values.Add(value);

if (DateTime.UtcNow.Second % interval == 0)
{
Console.WriteLine(values.Average(p => p.High)); // do some work
values.Clear();
}
}
});

Task.WaitAll(producer, consumer);

C# Producer/Consumer setup, Consumer never works if there's a UI?


In the example a console app is used. They use .wait to make sure the console app does not quit but in your case you should probably await it because .wait is blocking. But please share your actual code. As you might imagine it is kind of hard for us to tell what is going in if you don't post your code

In case anyone else was stuck like I was, changing the "consumer.Wait()" to "await consumer" as @PeterBons suggested was the answer. In my case it still acts a bit funky, but the full functionality does work, just a bit more behind the scenes than I expected.

Infinite producer/consumer via serial port data

First things first, starting multiple asynchronous operations and awaiting them one by one is wrong:

// Wrong
await producer;
await consumer;

The reason is that if the first operation fails, the second operation will become fire-and-forget. And allowing tasks to escape your supervision and continue running unattended, can only contribute to your program's instability. Nothing good can come out from that.

// Correct
await Task.WhenAll(producer, consumer)

Now regarding your main issue, which is how to make sure that a failure in one task will cause the timely completion of the other task. My suggestion is to hook the failure of each task with the cancellation of a CancellationTokenSource. In addition, both tasks should watch the associated CancellationToken, and complete cooperatively as soon as possible after they receive a cancellation signal.

var cts = new CancellationTokenSource();
Task producer = StartProducerAsync(cts.Token).OnErrorCancel(cts);
Task consumer = StartConsumerAsync(cts.Token).OnErrorCancel(cts);
await Task.WhenAll(producer, consumer)

Here is the OnErrorCancel extension method:

public static Task OnErrorCancel(this Task task, CancellationTokenSource cts)
{
return task.ContinueWith(t =>
{
if (t.IsFaulted) cts.Cancel();
return t;
}, default, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap();
}

Instead of doing this, you can also just add an all-enclosing try/catch block inside each task, and call cts.Cancel() in the catch.



Related Topics



Leave a reply



Submit