Is There Anything Like Asynchronous Blockingcollection<T>

Is there anything like asynchronous BlockingCollectionT?

There are four alternatives that I know of.

The first is Channels, which provides a threadsafe queue that supports asynchronous Read and Write operations. Channels are highly optimized and optionally support dropping some items if a threshold is reached.

The next is BufferBlock<T> from TPL Dataflow. If you only have a single consumer, you can use OutputAvailableAsync or ReceiveAsync, or just link it to an ActionBlock<T>. For more information, see my blog.

The last two are types that I created, available in my AsyncEx library.

AsyncCollection<T> is the async near-equivalent of BlockingCollection<T>, capable of wrapping a concurrent producer/consumer collection such as ConcurrentQueue<T> or ConcurrentBag<T>. You can use TakeAsync to asynchronously consume items from the collection. For more information, see my blog.

AsyncProducerConsumerQueue<T> is a more portable async-compatible producer/consumer queue. You can use DequeueAsync to asynchronously consume items from the queue. For more information, see my blog.

The last three of these alternatives allow synchronous and asynchronous puts and takes.

Asynchronous Take from blocking collection

So there doesn't look to be a built in option for this, I went out and tried to do my best to make what I wanted as an experiment. Turns out there is a lot of cruft to do in order to make this work roughly as other users of the old async pattern.

public class AsyncQueue<T>
{
private readonly ConcurrentQueue<T> queue;
private readonly ConcurrentQueue<DequeueAsyncResult> dequeueQueue;

private class DequeueAsyncResult : IAsyncResult
{
public bool IsCompleted { get; set; }
public WaitHandle AsyncWaitHandle { get; set; }
public object AsyncState { get; set; }
public bool CompletedSynchronously { get; set; }
public T Result { get; set; }

public AsyncCallback Callback { get; set; }
}

public AsyncQueue()
{
dequeueQueue = new ConcurrentQueue<DequeueAsyncResult>();
queue = new ConcurrentQueue<T>();
}

public void Enqueue(T item)
{
DequeueAsyncResult asyncResult;
while (dequeueQueue.TryDequeue(out asyncResult))
{
if (!asyncResult.IsCompleted)
{
asyncResult.IsCompleted = true;
asyncResult.Result = item;

ThreadPool.QueueUserWorkItem(state =>
{
if (asyncResult.Callback != null)
{
asyncResult.Callback(asyncResult);
}
else
{
((EventWaitHandle) asyncResult.AsyncWaitHandle).Set();
}
});
return;
}
}
queue.Enqueue(item);
}

public IAsyncResult BeginDequeue(int timeout, AsyncCallback callback, object state)
{
T result;
if (queue.TryDequeue(out result))
{
var dequeueAsyncResult = new DequeueAsyncResult
{
IsCompleted = true,
AsyncWaitHandle = new EventWaitHandle(true, EventResetMode.ManualReset),
AsyncState = state,
CompletedSynchronously = true,
Result = result
};
if (null != callback)
{
callback(dequeueAsyncResult);
}
return dequeueAsyncResult;
}

var pendingResult = new DequeueAsyncResult
{
AsyncState = state,
IsCompleted = false,
AsyncWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset),
CompletedSynchronously = false,
Callback = callback
};
dequeueQueue.Enqueue(pendingResult);
Timer t = null;
t = new Timer(_ =>
{
if (!pendingResult.IsCompleted)
{
pendingResult.IsCompleted = true;
if (null != callback)
{
callback(pendingResult);
}
else
{
((EventWaitHandle)pendingResult.AsyncWaitHandle).Set();
}
}
t.Dispose();
}, new object(), timeout, Timeout.Infinite);

return pendingResult;
}

public T EndDequeue(IAsyncResult result)
{
var dequeueResult = (DequeueAsyncResult) result;
return dequeueResult.Result;
}
}

I'm not too sure about the synchronization of the IsComplete property, and I'm not too hot on how the dequeueQueue only gets cleaned up on subsequent Enqueue calls. I'm not sure when is the correct time to signal the wait handles either but this is the best solution that I've got so far.

Please do not consider this production quality code by any means. I just wanted to show the general gist of how I got around to keeping all threads spinning without waiting locks. I'm sure this is full of all kinds of edge cases and bugs but it fulfills the requirements and I wanted to give something back to people who comes across the question.

BlockingCollection with async task

If I understand you correctly, you want to ensure that only one task at a time is processed by so called "producer". Then with slight modifications to your code you can do it like this:

internal class Producer : IDisposable {
private readonly BlockingCollection<RandomStringRequest> _collection;

public Producer() {
_collection = new BlockingCollection<RandomStringRequest>(new ConcurrentQueue<RandomStringRequest>());
}

public void Start() {
Task consumer = Task.Factory.StartNew(() => {
try {
foreach (var request in _collection.GetConsumingEnumerable()) {
Thread.Sleep(100); // long work
request.SetResult(GetRandomString());
}
}
catch (InvalidOperationException) {
Console.WriteLine("Adding was compeleted!");
}
});
}

public RandomStringRequest GetRandomString(string consumerName) {
var request = new RandomStringRequest();
_collection.Add(request);
return request;
}

public void Dispose() {
_collection.CompleteAdding();
}

private string GetRandomString() {
var chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
var random = new Random();
var result = new string(Enumerable
.Repeat(chars, 8)
.Select(s => s[random.Next(s.Length)])
.ToArray());
return result;
}
}

internal class RandomStringRequest : IDisposable {
private string _result;
private ManualResetEvent _signal;

public RandomStringRequest() {
_signal = new ManualResetEvent(false);
}

public void SetResult(string result) {
_result = result;
_signal.Set();
}

public string GetResult() {
_signal.WaitOne();
return _result;
}

public bool TryGetResult(TimeSpan timeout, out string result) {
result = null;
if (_signal.WaitOne(timeout)) {
result = _result;
return true;
}
return false;
}

public void Dispose() {
_signal.Dispose();
}
}

internal class Consumer {
private Producer _producer;
private string _name;

public Consumer(
Producer producer,
string name) {
_producer = producer;
_name = name;
}

public string GetOrderedString() {
using (var request = _producer.GetRandomString(_name)) {
// wait here for result to be prepared
var produced = request.GetResult();
return String.Join(String.Empty, produced.OrderBy(c => c));
}
}
}

Note that producer is single-threaded and it uses GetConsumingEnumerable. Also there is no semaphore and no Tasks. Instead, RandomStringRequest is returned to consumer, and while calling GetResult or TryGetResult, it will wait until result is produced by producer (or timeout expires). You may also want to pass CancellationTokens in some places (like to GetConsumingEnumerable).

Can BlockingCollection.GetConsumingEnumerable Deadlock

will collection.GetConsumingEnumerable() block, leading to deadlocks, similar to .Wait or .Result

No. There are two parts to the classic deadlock:

  1. A context that only allows one thread at a time (usually a UI SynchronizationContext or ASP.NET pre-Core SynchronizationContext).
  2. Blocking on code that uses await (which uses that context in order to complete the async method).

GetConsumingEnumerable is a blocking call, but it's not blocking on asynchronous code, so there's no danger of the classic deadlock there. More generally, using a queue like this inserts a "barrier" of sorts between two pieces of code.

what is the preferred way to stream items in an async method

Modern code should use IAsyncEnumerable<T>.

However, if you're not on that platform yet, you can use a queue as a "barrier". You wouldn't want to use BlockingCollection<T>, though, because it is designed for synchronous producers and synchronous consumers. For asynchronous producers/consumers, I'd recommend Channels.

awaiting async method in consumer using BlockingCollection as queue

As spender correctly pointed out, BlockingCollection (as the name implies) is intended only for use with blocking code, and does not work so well with asynchronous code.

There are async-compatible producer/consumer queues, such as BufferBlock<T>. In this case, I would think ActionBlock<T> would be even better:

private ActionBlock<IMobileMsg> _block = new ActionBlock<IMobileMsg>(async msg =>
{
try
{
Trace.TraceInformation("Sending: {0}", msg.Name);
await Task.Delay(1000);
msg.SentTime = DateTime.UtcNow;
Trace.TraceInformation("X sent at {1}: {0}", msg.Name, msg.SentTime);
}
catch (Exception e)
{
TraceException(e);
}
});

This replaces your entire consuming thread and main loop.

How to implement the BlockingCollection.TakeFromAny equivalent for Channels?

I came up with something like this:


public static async Task<(T Item, int Index)> TakeFromAnyAsync<T>(
ChannelReader<T>[] channelReaders,
CancellationToken cancellationToken = default)
{
if (channelReaders == null)
{
throw new ArgumentNullException(nameof(channelReaders));
}

if (channelReaders.Length == 0)
{
throw new ArgumentException("The list cannot be empty.", nameof(channelReaders));
}

if (channelReaders.Length == 1)
{
return (await channelReaders[0].ReadAsync(cancellationToken), 0);
}

// First attempt to read an item synchronosuly
for (int i = 0; i < channelReaders.Length; ++i)
{
if (channelReaders[i].TryRead(out var item))
{
return (item, i);
}
}

using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
{

var waitToReadTasks = channelReaders
.Select(it => it.WaitToReadAsync(cts.Token).AsTask())
.ToArray();

var pendingTasks = new List<Task<bool>>(waitToReadTasks);

while (pendingTasks.Count > 1)
{
var t = await Task.WhenAny(pendingTasks);

if (t.IsCompletedSuccessfully && t.Result)
{
int index = Array.IndexOf(waitToReadTasks, t);
var reader = channelReaders[index];

// Attempt to read an item synchronosly
if (reader.TryRead(out var item))
{
if (pendingTasks.Count > 1)
{
// Cancel pending "wait to read" on the remaining readers
// then wait for the completion
try
{
cts.Cancel();
await Task.WhenAll((IEnumerable<Task>)pendingTasks);
}
catch { }
}
return (item, index);
}

// Due to the race condition item is no longer available
if (!reader.Completion.IsCompleted)
{
// .. but the channel appears to be still open, so we retry
var waitToReadTask = reader.WaitToReadAsync(cts.Token).AsTask();
waitToReadTasks[index] = waitToReadTask;
pendingTasks.Add(waitToReadTask);
}

}

// Remove all completed tasks that could not yield
pendingTasks.RemoveAll(tt => tt == t ||
tt.IsCompletedSuccessfully && !tt.Result ||
tt.IsFaulted || tt.IsCanceled);

}

int lastIndex = 0;
if (pendingTasks.Count > 0)
{
lastIndex = Array.IndexOf(waitToReadTasks, pendingTasks[0]);
await pendingTasks[0];
}

var lastItem = await channelReaders[lastIndex].ReadAsync(cancellationToken);
return (lastItem, lastIndex);
}
}

Is there a data structure in C# like a ConcurrentQueue which allows me to await an empty queue until an item is added?

There is a modern solution for this: Channels. A "Channel" is an asynchronous producer/consumer queue.

Channels also have the concept of "completion", so you can complete the channel rather than having a cancelled flag.

Usage:

public static async Task ServiceLoop() {
var awaitedQueue = Channel.CreateUnbounded<int>();
var queueReader = awaitedQueue.Reader;

while (await queueReader.WaitToReadAsync())
{
while (queueReader.TryRead(out var item))
{
Console.WriteLine(item);
}
}
}


Related Topics



Leave a reply



Submit