Error-Attempt by Method 'X.Set_Dbconnection(System.Data.Common.Dbconnection)' to Access Method 'Y.Get_Settings()' Failed

BroadcastBlock with guaranteed delivery in TPL Dataflow

It is fairly simple to build what you're asking using ActionBlock and SendAsync(), something like:

public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
IEnumerable<ITargetBlock<T>> targets)
{
var targetsList = targets.ToList();

return new ActionBlock<T>(
async item =>
{
foreach (var target in targetsList)
{
await target.SendAsync(item);
}
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}

This is the most basic version, but extending it to support mutable list of targets, propagating completion or cloning function should be easy.

BroadcastCopyBlock for TPL Dataflow with guaranteed delivery

TL/DR
Your implementation uses the Post method inside the ActionBlock, which still will lose the data if target rejects the message, switch to the SendAsync one, and, probably, you don't need to implenment all these methods, you need only ITargetBlock<in TInput> interface implementation.


I want to clarify something before coming back to your main question. I think that you are confused by some options from TPL Dataflow library, and I want explain them a bit here. The behavior you're saying The first consumer, which receives the message, deletes it from the queue is not about the BroadcastBlock, it is about the multiple consumers linked for an ISourceBlock, like BufferBlock:

var buffer = new BufferBlock<int>();
var consumer1 = new ActionBlock<int>(i => {});
var consumer2 = new ActionBlock<int>(i => { Console.WriteLine(i); });
buffer.LinkTo(consumer1);
buffer.LinkTo(consumer2);
// this one will go only for one consumer, no console output present
buffer.Post(1);

What the BroadcastBlock do is exactly what are you talking about, consider this code:

private static void UnboundedCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Unbounded Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Unbounded Block: {i}");
});
broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}

The output will be

FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2

However, this can be done only is the speed of incoming data is less than the speed of processing the data, because in other case your memory will end up quickly because of buffers grow, as you stated in your question. Let's see what will happen if we use the ExecutionDataflowBlockOptions for limit the incoming data buffer for a slow block:

private static void BoundedCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Bounded Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Bounded Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}

The output will be

FAST Bounded Block: 0
FAST Bounded Block: 1
FAST Bounded Block: 2
SLOW Bounded Block: 0
SLOW Bounded Block: 1

As you can see, our slow block lost the last message, which is not what we are looking for. The reason for this is that the BroadcastBlock, by default, uses the Post method to deliver messages. According official Intro Document:

  • Post

    • An extension method that asynchronously posts to the target block. It returns immediately whether the data could be accepted or not, and it does not allow for the target to consume the message at a later time.
  • SendAsync

    • An extension method that asynchronously sends to target blocks while supporting buffering. A Post operation on a target is asynchronous, but if a target wants to postpone the offered data, there is nowhere for the data to be buffered and the target must instead be forced to decline. SendAsync enables asynchronous posting of the data with buffering, such that if a target postpones, it will later be able to retrieve the postponed data from the temporary buffer used for this one asynchronously posted message.

So, this method could help us in our mission, let's introduce some wrapper ActionBlock, which do exactly what we want - SendAsync the data for our real processors:

private static void BoundedWrapperInfiniteCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Wrapper Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Wrapper Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));

broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}

The output will be

FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2

But this waiting will never end - our basic wrapper does not propagate the completion for linked blocks, and the ActionBlock can't be linked to anything. We can try to wait for an wrapper completion:

private static void BoundedWrapperFiniteCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST finite Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW finite Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowActionWrapper.Completion.Wait();
}

The output will be

FAST finite Block: 0
FAST finite Block: 1
FAST finite Block: 2
SLOW finite Block: 0

Which is definitely not what we wanted - ActionBlock finished all the job, and the posting for a last message wouldn't be awaited for. Moreover, we don't even see the second message because we exit the method before Sleep method ends! So you definitely need your own implementation for this.

Now, at last, some thoughts about your code:

  1. You do not need such large amount of methods being implemented - your wrapper will be used as ITargetBlock<in TInput>, so implement only that interface.
  2. Your implementation uses the Post method inside the ActionBlock, which, as we saw, could lead to data loss in case of some problems on consumer's side. Consider SendAsync method instead.
  3. After previous change, you should measure up the performance of your dataflow - if you got many async waits for data to deliver, you probably will see the performance and/or memory problems. This should be fixed with some advanced settings which are discussed in linked documentation.
  4. Your implementation of the Completion task actually reverses the order of your dataflow - you are waiting for targets to complete, which, as I think, is not good practice - you probably should create an ending block for your dataflow (this could be even NullTarget block, which simply synchronously drops the incoming message), and wait for it to be completed.

Alternate to Dataflow BroadcastBlock with guaranteed delivery

Assuming you want to handle one item at a time by the broadcaster while enabling the target blocks to receive that item concurrently you need to change the broadcaster to offer the number to all blocks at the same time and then asynchronously wait for all of them together to accept it before moving on to the next number:

var broadcaster = new ActionBlock<int>(async num => 
{
var tasks = new List<Task>();
foreach (var block in blocks)
{
tasks.Add(block.SendAsync(num));
}
await Task.WhenAll(tasks);
}, execopt);

Now, in this case where you don't have work after the await you can slightly optimize while still returning an awaitable task:

ActionBlock<int> broadcaster = new ActionBlock<int>(
num => Task.WhenAll(blocks.Select(block => block.SendAsync(num))), execopt);

tpl dataflow: fixed buffer size without throwing items away

That's exactly what BufferBlock is for. If you set its BoundedCapacity and it gets full, it will postpone receiving any messages until someone consumes them. This means that for example Post() will block and SendAsync() will return an unfinished Task.

EDIT: There is no built-in block that sends to multiple targets and never throws data away. But you can easily build one yourself from ActionBlock and sending loop:

static ITargetBlock<T> CreateMultipleTargetsBlock<T>(
IEnumerable<ITargetBlock<T>> targets, int boundedCapacity)
{
var targetsList = targets.ToList();

var block = new ActionBlock<T>(
async item =>
{
foreach (var target in targetsList)
{
await target.SendAsync(item);
}
},
new ExecutionDataflowBlockOptions { BoundedCapacity = boundedCapacity });

// TODO: propagate completion from block to targets

return block;
}

This code assumes that you don't need to clone the data for each target and that the list of targets never changes. Modifying the code for that should be fairly simple.

TPL Dataflow: ActionBlock that avoids repeatedly running a using-block (such as for writing to a StreamWriter) on every invocation of its delegate

Writing to a file one line at a time is expensive in itself even when you don't have to open the stream each time. Keeping a file stream open has other issues too, as file streams are always buffered, from the FileStream level all the way down to the file system driver, for performance reasons. You'd have to flush the stream periodically to ensure the data was written to disk.

To really improve performance you'd have to batch the records, eg with a BatchBlock. Once you do that, the cost of opening the stream becomes negligible.

The lines should be generated at the last possible moment too, to avoid generating temporary strings that will need to be garbage collected. At n*1M records, the memory and CPU overhead of those allocations and garbage collections would be severe.

Logging libraries batch log entries before writing to avoid this performance hit.

You can try something like this :

var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>( records => {

//Create or open a file for appending
using var writer=new StreamWriter(ThePath,true);
foreach(var record in records)
{
writer.WriteLine("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
}

});

batchBlock.LinkTo(writerBlock,options);

or, using asynchronous methods

var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>(async records => {

//Create or open a file for appending
await using var writer=new StreamWriter(ThePath,true);
foreach(var record in records)
{
await writer.WriteLineAsync("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
}

});

batchBlock.LinkTo(writerBlock,options);

You can adjust the batch size and the StreamWriter's buffer size for optimum performance.

Creating an actual "Block" that writes to a stream

A custom block can be created using the technique shown in the Custom Dataflow block walkthrough - instead of creating an actual custom block, create something that returns whatever is needed for LinkTo to work, in this case an ITargetBlock< T> :

ITargetBlock<Record> FileExporter(string path)
{
var writer=new StreamWriter(path,true);
var block=new ActionBlock<Record>(async msg=>{
await writer.WriteLineAsync("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
});

//Close the stream when the block completes
block.Completion.ContinueWith(_=>write.Close());
return (ITargetBlock<Record>)target;
}
...

var exporter1=CreateFileExporter(path1);
previous.LinkTo(exporter,options);

The "trick" here is that the stream is created outside the block and remains active until the block completes. It's not garbage-collected because it's used by other code. When the block completes, we need to explicitly close it, no matter what happened. block.Completion.ContinueWith(_=>write.Close()); will close the stream whether the block completed gracefully or not.

This is the same code used in the Walkthrough, to close the output BufferBlock :

target.Completion.ContinueWith(delegate
{
if (queue.Count > 0 && queue.Count < windowSize)
source.Post(queue.ToArray());
source.Complete();
});

Streams are buffered by default, so calling WriteLine doesn't mean the data will actually be written to disk. This means we don't know when the data will actually be written to the file. If the application crashes, some data may be lost.

Memory, IO and overheads

When working with 1M rows over a significant period of time, things add up. One could use eg File.AppendAllLinesAsync to write batches of lines at once, but that would result in the allocation of 1M temporary strings. At each iteration, the runtime would have to use at least as RAM for those temporary strings as the batch. RAM usage would start ballooning to hundreds of MBs, then GBs, before the GC fired freezing the threads.

With 1M rows and lots of data it's hard to debug and track data in the pipeline. If something goes wrong, things can crash very quickly. Imagine for example 1M messages stuck in one block because one message got blocked.

It's important (for sanity and performance reasons) to keep individual components in the pipeline as simple as possible.

TPL Dataflow: Bounded capacity and waiting for completion

Yes, this is the expected behavior, because of the BroadcastBlock:

Provides a buffer for storing at most one element at time, overwriting each message with the next as it arrives.

This means that if you link BroadcastBlock to blocks with BoundedCapacity, you will lose messages.

To fix that, you could create a custom block that behaves like BroadcastBlock, but guarantees delivery to all targets. But doing that is not trivial, so you might be satisified with a simpler variant (originally from my old answer):

public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
IEnumerable<ITargetBlock<T>> targets, DataflowBlockOptions options)
{
var targetsList = targets.ToList();

var block = new ActionBlock<T>(
async item =>
{
foreach (var target in targetsList)
{
await target.SendAsync(item);
}
}, new ExecutionDataflowBlockOptions
{
BoundedCapacity = options.BoundedCapacity,
CancellationToken = options.CancellationToken
});

block.Completion.ContinueWith(task =>
{
foreach (var target in targetsList)
{
if (task.Exception != null)
target.Fault(task.Exception);
else
target.Complete();
}
});

return block;
}

Usage in your case would be:

var bcBlock = CreateGuaranteedBroadcastBlock(
new[] { bufferBlock1, bufferBlock2 }, dbOptions);

TPL DataFlow is being idle with no reason

Judging from your other recent question, my guess is that the behavior of your pipeline is dominated by the behavior of the heavily saturated ThreadPool. The blocks are competing with each other for the few and slowly increasing number of available ThreadPool threads, making the MaxDegreeOfParallelism configuration of the blocks mostly irrelevant for approximately the first couple of minutes after the start of the application. Eventually the ThreadPool will grow enough to satisfy the demand, but with an injection rate of only one new thread every second, this will take a while. Since your application makes so heavy use of the ThreadPool, it might be a good idea to use the ThreadPool.SetMinThreads method at the start of the program, and configure the minimum number of threads the ThreadPool will create instantly on demand before switching to the slow algorithm.

Alternatively you could consider converting your synchronous work to asynchronous, if this is possible (if asynchronous APIs are available for whatever you are doing), in order to minimize the number of required threads.



Related Topics



Leave a reply



Submit