Creating a Blocking Queue≪T≫ in .Net

Creating a blocking QueueT in .NET?

That looks very unsafe (very little synchronization); how about something like:

class SizeQueue<T>
{
private readonly Queue<T> queue = new Queue<T>();
private readonly int maxSize;
public SizeQueue(int maxSize) { this.maxSize = maxSize; }

public void Enqueue(T item)
{
lock (queue)
{
while (queue.Count >= maxSize)
{
Monitor.Wait(queue);
}
queue.Enqueue(item);
if (queue.Count == 1)
{
// wake up any blocked dequeue
Monitor.PulseAll(queue);
}
}
}
public T Dequeue()
{
lock (queue)
{
while (queue.Count == 0)
{
Monitor.Wait(queue);
}
T item = queue.Dequeue();
if (queue.Count == maxSize - 1)
{
// wake up any blocked enqueue
Monitor.PulseAll(queue);
}
return item;
}
}
}

(edit)

In reality, you'd want a way to close the queue so that readers start exiting cleanly - perhaps something like a bool flag - if set, an empty queue just returns (rather than blocking):

bool closing;
public void Close()
{
lock(queue)
{
closing = true;
Monitor.PulseAll(queue);
}
}
public bool TryDequeue(out T value)
{
lock (queue)
{
while (queue.Count == 0)
{
if (closing)
{
value = default(T);
return false;
}
Monitor.Wait(queue);
}
value = queue.Dequeue();
if (queue.Count == maxSize - 1)
{
// wake up any blocked enqueue
Monitor.PulseAll(queue);
}
return true;
}
}

Thread-safe blocking queue implementation on .NET

How about this one Creating a blocking Queue in .NET?

If you need it for .NET 1.1 (I wasn't sure from the question), just drop the generics and replace T with object.

Blocking queue with task

You should use TPL Dataflow which is a framework that does all that for you. You create an AcionBlock, give it a delegate and set it's MaxDegreeOfParallelism.

It should look similar to this:

var block = new ActionBlock<string>(folderName => 
{
UploadFolder(folderName);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

foreach (var folderName in GetFolderNames())
{
block.Post(folderName);
}

block.Complete();
await block.Completion;

Wait for blocking collection (queue) to decrease in size in C#

Instead of using a Queue<T>, you can use a BlockingCollection<T> for Q2. If you set its BoundedCapacity, calls to Q2.Add() will block when the capacity is reached. This will automatically throttle the processing of Q1, as the N tasks will begin blocking if they can't add to the final queue.

Is this BlockingQueue susceptible to deadlock?

I think this could be your problem:

Thread 1                    Thread 2
Dequeue
Enqueue
if (Size == 0) // Thread 1 gets the lock
lock (_queue) // Thread 2 has to wait
return _queue.Count // Thread 1 sees: Size == 0
_queue.Enqueue // Thread 2 gets the lock
_event.Set
_event.Reset // uh oh
_event.WaitOne // now Dequeue's going to block
// until Enqueue gets called again
// (even though queue isn't empty)

C# FIFO Queue Like BLockingCollection without waiting

If I understand correctly your requirements, you can use a regular Queue<T> with simple Monitor based signaling like this:

Members:

private readonly int maxSize;
private readonly Queue<logEntry> logQueue;
private bool stopRequest;

Constructor:

maxSize = GlobalSettings.LogQueueSize;
logQueue = new Queue<logEntry>(maxSize);

Producer method:

public void Add(logEntry logEntry)
{
lock (logQueue)
{
if (stopRequest) return;
logQueue.Enqueue(logEntry);
if (logQueue.Count == 1)
Monitor.Pulse(logQueue);
}
}

Method to stop the process worker:

public void Stop()
{
lock (logQueue)
{
if (stopRequest) return;
stopRequest = true;
Monitor.Pulse(logQueue);
}
}

Process worker (the method called with Task.Run):

private void ProcessWorker()
{
while (true)
{
logEntry LE;
lock (logQueue)
{
while (!stopRequest && logQueue.Count == 0)
Monitor.Wait(logQueue);
if (stopRequest) break;
if (logQueue.Count > maxSize)
{
logQueue.Clear();
continue;
}
LE = logQueue.Dequeue();
}
try
{
ProcessLogEntry(LE);
}
catch (Exception E)
{
functions.Logger.log("Error processing logEntry" + E.Message, "LOGPROCESSING", LOGLEVEL.ERROR);
functions.printException(E);
}
}
functions.Logger.log("Exiting Queue Task", "LOGPROCESSING", LOGLEVEL.ERROR);
}

This is just to get the idea, you can further tune the implementation to better suit your needs.

Checking a QueueT Continuously

Try the blocking queue: Creating a blocking Queue<T> in .NET?

The basic idea is that when you call TryDequeue it will block until there is something in the queue. As you can see "beauty" of the blocking queue is that you don't have to poll/sleep or do anything crazy like that... it's the fundamental backbone for a Producer/Consumer pattern.

My version of the blocking queue is:

public class BlockingQueue<T> where T : class
{
private bool closing;
private readonly Queue<T> queue = new Queue<T>();

public int Count
{
get
{
lock (queue)
{
return queue.Count;
}
}
}

public BlockingQueue()
{
lock (queue)
{
closing = false;
Monitor.PulseAll(queue);
}
}

public bool Enqueue(T item)
{
lock (queue)
{
if (closing || null == item)
{
return false;
}

queue.Enqueue(item);

if (queue.Count == 1)
{
// wake up any blocked dequeue
Monitor.PulseAll(queue);
}

return true;
}
}


public void Close()
{
lock (queue)
{
if (!closing)
{
closing = true;
queue.Clear();
Monitor.PulseAll(queue);
}
}
}


public bool TryDequeue(out T value, int timeout = Timeout.Infinite)
{
lock (queue)
{
while (queue.Count == 0)
{
if (closing || (timeout < Timeout.Infinite) || !Monitor.Wait(queue, timeout))
{
value = default(T);
return false;
}
}

value = queue.Dequeue();
return true;
}
}

public void Clear()
{
lock (queue)
{
queue.Clear();
Monitor.Pulse(queue);
}
}
}

Many thanks to Marc Gravell for this one!

In what way is a BlockingQueue blocking?

This is a great explanation:

https://blogs.msdn.microsoft.com/toub/2006/04/12/blocking-queues/

Essentially, the blocking happens when you try to DeQueue something. EnQueue is thread-safe while DeQueue will block (block the thread calling DeQueue) until there is something there to get.

The nice thing is you don't have to care (nor should you care) about how the blocking is implemented.



Related Topics



Leave a reply



Submit