Implementing the Producer/Consumer Pattern in C#

How to implement a continuous producer-consumer pattern inside a Windows Service

Though @JSteward' answer is a good start, you can improve it with mixing up the TPL-Dataflow and Rx.NET extensions, as a dataflow block may easily become an observer for your data, and with Rx Timer it will be much less effort for you (Rx.Timer explanation).

We can adjust MSDN article for your needs, like this:

private const int EventIntervalInSeconds = 5;
private const int DueIntervalInSeconds = 60;

var source =
// sequence of Int64 numbers, starting from 0
// https://msdn.microsoft.com/en-us/library/hh229435.aspx
Observable.Timer(
// fire first event after 1 minute waiting
TimeSpan.FromSeconds(DueIntervalInSeconds),
// fire all next events each 5 seconds
TimeSpan.FromSeconds(EventIntervalInSeconds))
// each number will have a timestamp
.Timestamp()
// each time we select some items to process
.SelectMany(GetItemsFromDB)
// filter already added
.Where(i => !_processedItemIds.Contains(i.Id));

var action = new ActionBlock<Item>(ProcessItem, new ExecutionDataflowBlockOptions
{
// we can start as many item processing as processor count
MaxDegreeOfParallelism = Environment.ProcessorCount,
});

IDisposable subscription = source.Subscribe(action.AsObserver());

Also, your check for item being already processed isn't quite accurate, as there is a possibility to item get selected as unprocessed from db right at the time you've finished it's processing, yet didn't update it in database. In this case item will be removed from Queue<T>, and after that added there again by producer, this is why I've added the ConcurrentBag<T> to this solution (HashSet<T> isn't thread-safe):

private static async Task ProcessItem(Item item)
{
if (_processedItemIds.Contains(item.Id))
{
return;
}

_processedItemIds.Add(item.Id);
// actual work here

// save item as processed in database

// we need to wait to ensure item not to appear in queue again
await Task.Delay(TimeSpan.FromSeconds(EventIntervalInSeconds * 2));

// clear the processed cache to reduce memory usage
_processedItemIds.Remove(item.Id);
}

public class Item
{
public Guid Id { get; set; }
}

// temporary cache for items in process
private static ConcurrentBag<Guid> _processedItemIds = new ConcurrentBag<Guid>();

private static IEnumerable<Item> GetItemsFromDB(Timestamped<long> time)
{
// log event timing
Console.WriteLine($"Event # {time.Value} at {time.Timestamp}");

// return items from DB
return new[] { new Item { Id = Guid.NewGuid() } };
}

You can implement cache clean up in other way, for example, start a "GC" timer, which will remove processed items from cache on regular basis.

To stop events and processing items you should Dispose the subscription and, maybe, Complete the ActionBlock:

subscription.Dispose();
action.Complete();

You can find more information about Rx.Net in their guidelines on github.

Producer/ Consumer pattern using threads and EventWaitHandle

Personally, for simple producer-consumer problems, I would just use BlockingCollection. There would be no need to manually code your own synchronization logic. The consuming threads will also block if there are no items present in the queue.

Here is what your code might look like if you use this class:

private BlockingCollection<DataRecievedEnqeueInfo> mReceivingThreadQueue = new BlockingCollection<DataRecievedEnqeueInfo>();
private BlockingCollection<DataSendEnqeueInfo> mSendingThreadQueue = new BlockingCollection<DataSendEnqeueInfo>();

public void Stop()
{
// No need for mIsRunning. Makes the enumerables in the GetConsumingEnumerable() calls
// below to complete.
mReceivingThreadQueue.CompleteAdding();
mSendingThreadQueue.CompleteAdding();
}

private void ReceivingThread()
{
foreach (DataRecievedEnqeueInfo item in mReceivingThreadQueue.GetConsumingEnumerable())
{
ProcessReceivingItem(item);
}
}

private void SendingThread()
{
foreach (DataSendEnqeueInfo item in mSendingThreadQueue.GetConsumingEnumerable())
{
ProcessSendingItem(item);
}
}

internal void EnqueueRecevingData(DataRecievedEnqeueInfo info)
{
// You can also use TryAdd() if there is a possibility that you
// can add items after you have stopped. Otherwise, this can throw an
// an exception after CompleteAdding() has been called.
mReceivingThreadQueue.Add(info);
}

public void EnqueueSend(DataSendEnqeueInfo info)
{
mSendingThreadQueue.Add(info);
}

C# Implementing Producer/Consumer Queue In a Web Service

You can use a System.Collections.Concurrent.BlockingCollection<T> with a System.Collections.Concurrent.ConcurrentQueue<T> as the underlying collection.

As the name of the namespace implies, the collections are thread safe.

Start a consumer thread (or a few) to pull items from the collection, using the Take() method. When no items are available, the thread will block.

Your DoAsyncStuffHere method adds items to the BlockingCollection. These items could be unstarted System.Threading.Tasks.Task objects; the consumer thread(s) would in that case Start the tasks after taking them from the collection.



Related Topics



Leave a reply



Submit