When to Use Blockingcollection and When Concurrentbag Instead of List<T>

When to use BlockingCollection and when ConcurrentBag instead of List T ?

You can indeed use a BlockingCollection, but there is absolutely no point in doing so.

First off, note that BlockingCollection is a wrapper around a collection that implements IProducerConsumerCollection<T>. Any type that implements that interface can be used as the underlying storage:

When you create a BlockingCollection<T> object, you can specify not
only the bounded capacity but also the type of collection to use. For
example, you could specify a ConcurrentQueue<T> object for first in,
first out (FIFO) behavior, or a ConcurrentStack<T> object for last
in,first out (LIFO) behavior. You can use any collection class that
implements the IProducerConsumerCollection<T> interface. The default
collection type for BlockingCollection<T> is ConcurrentQueue<T>.

This includes ConcurrentBag<T>, which means you can have a blocking concurrent bag. So what's the difference between a plain IProducerConsumerCollection<T> and a blocking collection? The documentation of BlockingCollection says (emphasis mine):

BlockingCollection<T> is used as a wrapper for an
IProducerConsumerCollection<T> instance, allowing removal attempts
from the collection to block until data is available to be removed
.
Similarly, a BlockingCollection<T> can be created to enforce an
upper-bound on the number of data elements allowed
in the
IProducerConsumerCollection<T> [...]

Since in the linked question there is no need to do either of these things, using BlockingCollection simply adds a layer of functionality that goes unused.

ConcurrentBag and application hang with LINQ retrieval in .NET

According to the documentation of the ConcurrentBag<T> method:

All public and protected members of ConcurrentBag<T> are thread-safe and may be used concurrently from multiple threads. However, members accessed through one of the interfaces the ConcurrentBag<T> implements, including extension methods, are not guaranteed to be thread safe and may need to be synchronized by the caller.

The FirstOrDefault LINQ operator is an extension on the IEnumerable<T>, an interface that the ConcurrentBag<T> implements, so officially you are in the "undefined behavior" territory. That said, and knowing how the FirstOrDefault is implemented, I see no reason for it to cause your application to hang.

Regarding the question whether a ConcurrentBag<T> is a good option for what you are doing, I would say it's unlikely. The ConcurrentBag<T> is a very specialized collection, making it a bad option for anything that it's not a mixed producer-consumer scenario. And judging from the code that you posted, your scenario is not one of those. I would suggest to switch to a ConcurrentQueue<T>, which is a collection that supports enqueuing (adding) and enumerating concurrently, and also preserves the insertion order of the items it contains.

Block a Concurrent Collection such as a ConcurrentBag or BlockingCollection

I suspect this is going to be significantly harder than just wrapping a few lines of code with the lock keyword. The reason being that you would have to wrap all access to the ConcurrentBag with a lock. And if you did that then it would not be "concurrent" anymore. I think what you are asking for is a way to temporarily suspend the concurrent behavior of the collection in favor of a serialized behavior and then resume its concurrent behavior once again when you have completed a special guarded operation.

The naive approach would be to create your own collection that provides the same operations as ConcurrentBag by using it internally as the underlying collection. You would then provide SuspendConcurrent and ResumeConcurrent operations that toggle the concurrent behavior of the collection by setting and resetting a flag (probably via the Interlocked.CompareExchange method). The code would then use this flag to determine whether to directly forward Add, TryPeek, and TryTake operations to the underlying collection or use a hard lock before doing so. I think in the end you will find the code very complex and incredibly hard to get right. Based on personal experience any trivial implementation you are envisioning right now will probably either be wrong, be inefficient, or not be fully concurrent when you want it to be.

My advice is to figure out a way to restructure your code so that this requirement goes away.

Edit:

From one of your comments it seems as though you want a CAS like operation to be able to make a decision based on whether or not something has already occurred. There are several options, but one which might best suit your needs is to use the ConcurrentDictionary class which provides the TryAdd method. TryAdd will return true if the key does not yet exist. If the key does exist then it will return false. Many threads can be executing this method simultaneously with the same key, but only one will get the true response. You can then use an if statement to control program flow based on the return value.

Losing items somewhere in C# BlockingCollection with GetConsumingEnumerable()

Parallel.ForEach is meant for data parallelism (ie processing 100K rows using all 8 cores), not concurrent operations. This is essentially a pub/sub and async problem, if not a pipeline problem. There's nothing for the CPU to do in this case, just start the async operations and wait for them to complete.

.NET handles this since .NET 4.5 through the Dataflow classes and lately, the lower-level System.Threading.Channel namespace.

In its simplest form, you can create an ActionBlock<> that takes a buffer and target connection and publishes the data. Let's say you use this method to send the data to a server :

async Task MyBulkCopyMethod(string connectionString,DataTable data)
{
using(var bcp=new SqlBulkCopy(connectionString))
{
//Set up mappings etc.
//....
await bcp.WriteToServerAsync(data);
}
}

You can use this with an ActionBlock class with a configured degree of parallelism. Dataflow classes like ActionBlock have their own input, and where appropriate, output buffers, so there's no need to create a separate queue :

class DataMessage
{
public string Connection{get;set;}
public DataTable Data {get;set;}
}

...

var options=new ExecutionDataflowBlockOptions { 
MaxDegreeOfParallelism = 50,
BoundedCapacity = 8
};
var block=new ActionBlock<DataMessage>(msg=>MyBulkCopyMethod(msg.Connection,msg.Data, options);

We can start posting messages to the block now. By setting the capacity to 8 we ensure the input buffer won't get filled with large messages if the block is too slow. MaxDegreeOfParallelism controls how may operations run concurrently. Let's say we want to send the same data to many servers :

var data=.....;
var servers=new[]{connString1, connString2,....};
var messages= from sv in servers
select new DataMessage{ ConnectionString=sv,Table=data};

foreach(var msg in messages)
{
await block.SendAsync(msg);
}
//Tell the block we are done
block.Complete();
//Await for all messages to finish processing
await block.Completion;

Retries

One possibility for retries is to use a retry loop in the worker function. A better idea would be to use a different block and post failed messages there.

var block=new ActionBlock<DataMessage>(async msg=> {
try {
await MyBulkCopyMethod(msg.Connection,msg.Data, options);
}
catch(SqlException exc) when (some retry condition)
{
//Post without awaiting
retryBlock.Post(msg);
});

When the original block completes we want to tell the retry block to complete as well, no matter what :

block.Completion.ContinueWith(_=>retryBlock.Complete());

Now we can await the retryBlock to complete.

That block could have a smaller DOP and perhaps a delay between attempts :

var retryOptions=new ExecutionDataflowBlockOptions { 
MaxDegreeOfParallelism = 5
};
var retryBlock=new ActionBlock<DataMessage>(async msg=>{
await Task.Delay(1000);
try {
await MyBulkCopyMethod(msg.Connection,msg.Data, options);
}
catch (Exception ....)
{
...
}
});

This pattern can be repeated to create multiple levels of retry, or different conditions. It can also be used to create different priority workers by giving a larger DOP to high priority workers, or a larger delay to low priority workers

Search for a particular element in BlockingCollection

Searching a BlockingCollection<T> to see if it contains a particular element, is a functionality not needed for the intended usage of this class, which is to facilitate producer-consumer scenarios. So this class does not expose a Contains method as part of its public API. You could exploit the fact that the class implements the IEnumerable<T> interface, and use the LINQ Contains operator, however keep in mind this cautionary note from the ConcurrentBag<T> class. AFAIK it applies to all concurrent collections in general:

All public and protected members of ConcurrentBag<T> are thread-safe and may be used concurrently from multiple threads. However, members accessed through one of the interfaces the ConcurrentBag<T> implements, including extension methods, are not guaranteed to be thread safe, and may need to be synchronized by the caller.

(emphasis added)

So you could safely use the LINQ Contains extension method only after the "battle" is over, and all the concurrent operations on the BlockingCollection<T> instance have ended. Which is probably not what you want. In which case the BlockingCollection<T> is probably not the right tool for whatever problem you are trying to solve.

Adding a listener event to a ConcurrentQueue or ConcurrentBag?

These days, I recommend going with an async-compatible solution like System.Threading.Channels:

private static Channel<string> messageList = Channel.CreateUnbounded<string>();

private async Task<string> ListenToQueue(string queueName)
{
var cancellationtoken = new CancellationTokenSource(TimeSpan.FromMinutes(60)).Token;

try
{
var message = await getMessageFromQueue(queueName, cancellationtoken);
await messageList.Writer.WriteAsync(message, cancellationtoken);
}
catch (OperationCanceledException)
{
// ignored
}
}

private async Task Listener()
{
await foreach (var msg in messageList.Reader.ReadAllAsync())
{
if (!string.IsNullOrEmpty(msg))
_ = Task.Run(() => ProcessMessage(msg));
}
}

But if you want (or need) to stay in the blocking world, there's a solution there, too. ConcurrentBag<T> and ConcurrentQueue<T> are seldom used directly. Instead, it's more common to use BlockingCollection<T>, which wraps a concurrent collection and provides a higher-level API, including GetConsumingEnumerable:

private static BlockingCollection<string> messageList = new();

private async Task<string> ListenToQueue(string queueName)
{
var cancellationtoken = new CancellationTokenSource(TimeSpan.FromMinutes(60)).Token;

try
{
var message = await getMessageFromQueue(queueName, cancellationtoken);
messageList.Add(message, cancellationtoken);
}
catch (OperationCanceledException)
{
// ignored
}
}

private void Listener()
{
foreach (var msg in messageList.GetConsumingEnumerable())
{
if (!string.IsNullOrEmpty(msg))
_ = Task.Run(() => ProcessMessage(msg));
}
}

How to properly lock a collection

Locks only last for their scope.

lock (entriesLock)
{
//safe to access here.
}
// no longer safe

As such, your attempt of returning a locked list is unfortunately meaningless, as the lock expires right away when the getter/setter is left. Use the lock outside when you actually access the list.

for (int j = 0; j < 10000; j++)
{
lock (entriesLock)
{
lEntries.Add(j);
}
}

// or

lock (entriesLock)
{
for (int j = 0; j < 10000; j++)
{
lEntries.Add(j);
}
}


Related Topics



Leave a reply



Submit