How to implement a continuous producer-consumer pattern inside a Windows Service
Though @JSteward' answer is a good start, you can improve it with mixing up the TPL-Dataflow
and Rx.NET
extensions, as a dataflow block may easily become an observer for your data, and with Rx
Timer it will be much less effort for you (Rx.Timer
explanation).
We can adjust MSDN article for your needs, like this:
private const int EventIntervalInSeconds = 5;
private const int DueIntervalInSeconds = 60;
var source =
// sequence of Int64 numbers, starting from 0
// https://msdn.microsoft.com/en-us/library/hh229435.aspx
Observable.Timer(
// fire first event after 1 minute waiting
TimeSpan.FromSeconds(DueIntervalInSeconds),
// fire all next events each 5 seconds
TimeSpan.FromSeconds(EventIntervalInSeconds))
// each number will have a timestamp
.Timestamp()
// each time we select some items to process
.SelectMany(GetItemsFromDB)
// filter already added
.Where(i => !_processedItemIds.Contains(i.Id));
var action = new ActionBlock<Item>(ProcessItem, new ExecutionDataflowBlockOptions
{
// we can start as many item processing as processor count
MaxDegreeOfParallelism = Environment.ProcessorCount,
});
IDisposable subscription = source.Subscribe(action.AsObserver());
Also, your check for item being already processed isn't quite accurate, as there is a possibility to item get selected as unprocessed from db right at the time you've finished it's processing, yet didn't update it in database. In this case item will be removed from Queue<T>
, and after that added there again by producer, this is why I've added the ConcurrentBag<T>
to this solution (HashSet<T>
isn't thread-safe):
private static async Task ProcessItem(Item item)
{
if (_processedItemIds.Contains(item.Id))
{
return;
}
_processedItemIds.Add(item.Id);
// actual work here
// save item as processed in database
// we need to wait to ensure item not to appear in queue again
await Task.Delay(TimeSpan.FromSeconds(EventIntervalInSeconds * 2));
// clear the processed cache to reduce memory usage
_processedItemIds.Remove(item.Id);
}
public class Item
{
public Guid Id { get; set; }
}
// temporary cache for items in process
private static ConcurrentBag<Guid> _processedItemIds = new ConcurrentBag<Guid>();
private static IEnumerable<Item> GetItemsFromDB(Timestamped<long> time)
{
// log event timing
Console.WriteLine($"Event # {time.Value} at {time.Timestamp}");
// return items from DB
return new[] { new Item { Id = Guid.NewGuid() } };
}
You can implement cache clean up in other way, for example, start a "GC" timer, which will remove processed items from cache on regular basis.
To stop events and processing items you should Dispose
the subscription and, maybe, Complete
the ActionBlock
:
subscription.Dispose();
action.Complete();
You can find more information about Rx.Net
in their guidelines on github.
Producer/ Consumer pattern using threads and EventWaitHandle
Personally, for simple producer-consumer problems, I would just use BlockingCollection. There would be no need to manually code your own synchronization logic. The consuming threads will also block if there are no items present in the queue.
Here is what your code might look like if you use this class:
private BlockingCollection<DataRecievedEnqeueInfo> mReceivingThreadQueue = new BlockingCollection<DataRecievedEnqeueInfo>();
private BlockingCollection<DataSendEnqeueInfo> mSendingThreadQueue = new BlockingCollection<DataSendEnqeueInfo>();
public void Stop()
{
// No need for mIsRunning. Makes the enumerables in the GetConsumingEnumerable() calls
// below to complete.
mReceivingThreadQueue.CompleteAdding();
mSendingThreadQueue.CompleteAdding();
}
private void ReceivingThread()
{
foreach (DataRecievedEnqeueInfo item in mReceivingThreadQueue.GetConsumingEnumerable())
{
ProcessReceivingItem(item);
}
}
private void SendingThread()
{
foreach (DataSendEnqeueInfo item in mSendingThreadQueue.GetConsumingEnumerable())
{
ProcessSendingItem(item);
}
}
internal void EnqueueRecevingData(DataRecievedEnqeueInfo info)
{
// You can also use TryAdd() if there is a possibility that you
// can add items after you have stopped. Otherwise, this can throw an
// an exception after CompleteAdding() has been called.
mReceivingThreadQueue.Add(info);
}
public void EnqueueSend(DataSendEnqeueInfo info)
{
mSendingThreadQueue.Add(info);
}
C# Implementing Producer/Consumer Queue In a Web Service
You can use a System.Collections.Concurrent.BlockingCollection<T>
with a System.Collections.Concurrent.ConcurrentQueue<T>
as the underlying collection.
As the name of the namespace implies, the collections are thread safe.
Start a consumer thread (or a few) to pull items from the collection, using the Take()
method. When no items are available, the thread will block.
Your DoAsyncStuffHere method adds items to the BlockingCollection. These items could be unstarted System.Threading.Tasks.Task
objects; the consumer thread(s) would in that case Start
the tasks after taking them from the collection.
Related Topics
How to Run Commands on Ssh Server in C#
How to Convert Namevaluecollection to JSON String
Creating an Anonymous Type Dynamically
Why Does Ienumerator<T> Inherit from Idisposable While the Non-Generic Ienumerator Does Not
Cross-Thread Winforms Control Editing
How to Convert Date Format to Dd-Mm-Yyyy in C#
How to Asynchronously Read the Standard Output Stream and Standard Error Stream at Once
Apiresource VS APIscope VS Identityresource
How to Drag and Drop a Button from One Panel to Another Panel
Disabling Screen Saver and Power Options in C#
Controls in Container Form Come Over Child Form
Conditional Datagridview Formatting