Thread Safety for Datatable

Thread safety for DataTable

DataTable is simply not designed or intended for concurrent usage (in particular where there is any form of mutation involved). The advisable "wrapper" here would, in my view, be either:

  • remove the need to work on the DataTable concurrently (when involving mutation), or:
  • remove the DataTable, instead using a data-structure that either directly supports what you need (for example a concurrent collection), or which is much simpler and can be trivially synchronized (either exclusive or reader/writer)

Basically: change the problem.


From comments:

The code looks like:

Parallel.ForEach(strings, str=>
{
DataRow row;
lock(table){
row= table.NewRow();
}
MyParser.Parse(str, out row);
lock(table){
table.Rows.Add(row)
}
});

I can only hope that out row is a typo here, as that won't actually lead to it populating the row created via NewRow(), but: if you absolutely have to use that approach, you can't use NewRow, as the pending row is kinda shared. Your best bet would be:

Parallel.ForEach(strings, str=> {
object[] values = MyParser.Parse(str);
lock(table) {
table.Rows.Add(values);
}
});

The important change in the above is that the lock covers the entire new row process. Note that you will have no guarantee of order when using Parallel.ForEach like this, so it is important that the final order does not need to match exactly (which shouldn't be a problem if the data includes a time component).

However! I still think you are approaching this the wrong way: for parallelism to be relevant, it must be non-trivial data. If you have non-trivial data, you really don't want to have to buffer it all in memory. I strongly suggest doing something like the following, which will work fine on a single thread:

using(var bcp = new SqlBulkCopy())
using(var reader = ObjectReader.Create(ParseFile(path)))
{
bcp.DestinationTable = "MyLog";
bcp.WriteToServer(reader);
}
...
static IEnumerable<LogRow> ParseFile(string path)
{
using(var reader = File.OpenText(path))
{
string line;
while((line = reader.ReadLine()) != null)
{
yield return new LogRow {
// TODO: populate the row from line here
};
}
}
}
...
public sealed class LogRow {
/* define your schema here */
}

Advantages:

  • no buffering - this is a fully streaming operation (yield return does not put things into a list or similar)
  • for that reason, the rows can start streaming immediately without needing to wait for the entire file to be pre-processed first
  • no memory saturation issues
  • no threading complications / overheads
  • you get to preserve the original order (not usually critical, but nice)
  • you are only constrained by how fast you can read the original file, which is typically faster on a single thread than it is from multiple threads (contention on a single IO device is just overhead)
  • avoids all the overheads of DataTable, which is overkill here - because it is so flexible it has significant overheads
  • read (from the log file) and write (to the database) are now concurrent rather than sequential

I do a lot of things like ^^^ in my own work, and from experience it is usually at least twice as fast than populating a DataTable in memory first.


And finally - here's an example of an IEnumerable<T> implementation that accepts concurrent readers and writers without requiring everything to be buffered in memory - which would allow multiple threads to parse the data (calling Add and finally Close) with a single thread for SqlBulkCopy via the IEnumerable<T> API:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Acts as a container for concurrent read/write flushing (for example, parsing a
/// file while concurrently uploading the contents); supports any number of concurrent
/// writers and readers, but note that each item will only be returned once (and once
/// fetched, is discarded). It is necessary to Close() the bucket after adding the last
/// of the data, otherwise any iterators will never finish
/// </summary>
class ThreadSafeBucket<T> : IEnumerable<T>
{
private readonly Queue<T> queue = new Queue<T>();

public void Add(T value)
{
lock (queue)
{
if (closed) // no more data once closed
throw new InvalidOperationException("The bucket has been marked as closed");

queue.Enqueue(value);
if (queue.Count == 1)
{ // someone may be waiting for data
Monitor.PulseAll(queue);
}
}
}

public void Close()
{
lock (queue)
{
closed = true;
Monitor.PulseAll(queue);
}
}
private bool closed;

public IEnumerator<T> GetEnumerator()
{
while (true)
{
T value;
lock (queue)
{
if (queue.Count == 0)
{
// no data; should we expect any?
if (closed) yield break; // nothing more ever coming

// else wait to be woken, and redo from start
Monitor.Wait(queue);
continue;
}
value = queue.Dequeue();
}
// yield it **outside** of the lock
yield return value;
}
}

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}

static class Program
{
static void Main()
{
var bucket = new ThreadSafeBucket<int>();
int expectedTotal = 0;
ThreadPool.QueueUserWorkItem(delegate
{
int count = 0, sum = 0;
foreach(var item in bucket)
{
count++;
sum += item;
if ((count % 100) == 0)
Console.WriteLine("After {0}: {1}", count, sum);
}
Console.WriteLine("Total over {0}: {1}", count, sum);
});
Parallel.For(0, 5000,
new ParallelOptions { MaxDegreeOfParallelism = 3 },
i => {
bucket.Add(i);
Interlocked.Add(ref expectedTotal, i);
}
);
Console.WriteLine("all data added; closing bucket");
bucket.Close();
Thread.Sleep(100);
Console.WriteLine("expecting total: {0}",
Interlocked.CompareExchange(ref expectedTotal, 0, 0));
Console.ReadLine();

}

}

Best way to convert thread safe collection to DataTable?

This is a good candidate for PLINQ (or Rx - I'll focus on PLINQ since it's part of the Base Class Library).

IEnumerable<FinalObject> bag = allData
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
.Select(dataObj =>
{
FinalObject theData = Process(dataObj);

Thread.Sleep(100);

return theData;
});

DataTable table = createTable();

foreach (FinalObject moveObj in bag)
{
table.Rows.Add(moveObj.x);
}

Realistically, instead of throttling the loop via Thread.Sleep, you should be limiting the maximum degree of parallelism further until you get the CPU usage down to the desired level.

Disclaimer: all of the below is meant for entertainment only, although it does actually work.

Of course you can always kick it up a notch and produce a full-on async Parallel.ForEach implementation that allows you to process input in parallel and do your throttling asynchronously, without blocking any thread pool threads.

async Task ParallelForEachAsync<TInput, TResult>(IEnumerable<TInput> input,
int maxDegreeOfParallelism,
Func<TInput, Task<TResult>> body,
Action<TResult> onCompleted)
{
Queue<TInput> queue = new Queue<TInput>(input);

if (queue.Count == 0) {
return;
}

List<Task<TResult>> tasksInFlight = new List<Task<TResult>>(maxDegreeOfParallelism);

do
{
while (tasksInFlight.Count < maxDegreeOfParallelism && queue.Count != 0)
{
TInput item = queue.Dequeue();
Task<TResult> task = body(item);

tasksInFlight.Add(task);
}

Task<TResult> completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);

tasksInFlight.Remove(completedTask);

TResult result = completedTask.GetAwaiter().GetResult(); // We know the task has completed. No need for await.

onCompleted(result);
}
while (queue.Count != 0 || tasksInFlight.Count != 0);
}

Usage (full Fiddle here):

async Task<DataTable> ProcessAllAsync(IEnumerable<InputObject> allData)
{
DataTable table = CreateTable();
int maxDegreeOfParallelism = Environment.ProcessorCount;

await ParallelForEachAsync(
allData,
maxDegreeOfParallelism,
// Loop body: these Tasks will run in parallel, up to {maxDegreeOfParallelism} at any given time.
async dataObj =>
{
FinalObject o = await Task.Run(() => Process(dataObj)).ConfigureAwait(false); // Thread pool processing.

await Task.Delay(100).ConfigureAwait(false); // Artificial throttling.

return o;
},
// Completion handler: these will be executed one at a time, and can safely mutate shared state.
moveObj => table.Rows.Add(moveObj.x)
);

return table;
}

struct InputObject
{
public int x;
}

struct FinalObject
{
public int x;
}

FinalObject Process(InputObject o)
{
// Simulate synchronous work.
Thread.Sleep(100);

return new FinalObject { x = o.x };
}

Same behaviour, but without Thread.Sleep and ConcurrentBag<T>.

DataTable and thread safety

If you are exposing the datatable via data-binding, then forget it; you cannot make that thread-safe. Even if you wrap the DataView somehow (in a custom ITypedList), that doesn't do enough - data-binding makes assumptions about the data, in particular the IList etc - for example, that it isn't going to randomly change length in a thread-contended way in the middle of iterating the data or adding a row on the UI thread.

There is provision for changes on the same thread via events ... but not cross-threaded.

Thread-safe DataSet

No, it is not safe. You should change your code in:

public sealed class MyDataSet{

public static DataSet ds = new DataSet();

private static object _lock = new object();

public static UpdateRow(key,data)
{
lock(_lock){
DataRow dr = ds.Tables[0].Rows.Find(key);
dr.AcceptChanges();
dr.BeginEdit();
dr["col"] = data;
dr.EndEdit();
}
}
}

Your _lock object should be a static object in your program to make it a good lock. And your Find should be in the locked part.

ADO.NET DataTable/DataRow Thread Safety

Allen,

I could not find any specific problems with your approach, not that my testing was exhaustive. Here are some ideas that we stick with (all of our applications are thread centric):

Whenever possible:

[1] Make all data access completely atomic. As data sharing in multi-threaded applications is an excellent place for all kinds of unforeseen thread interaction.

[2] Avoid locking on a type. If the type is not know to be thread safe write a wrapper.

[3] Include structures that allow for the fast identification of threads that are accessing a shared resource. If system performance allows, log this information above the debug level and below usual operation log levels.

[4] Any code, including System.* et.al, not explicitly documented internally as Thread Safe Tested is not Thread Safe. Hearsay and the verbal word of others does not count. Test it and write it down.

Hope this is of some value.

Parallel.ForEach and DataTable - Isn't DataTable.NewRow() a thread safe read operation?

No, NewRow is not a "read" operation and is not thread safe.

Instead of using NewRow and populating the row you could just place your values in an array or list of object. Then when you've collected all of your data you can add it all to the DataTable.

var newRow = table.NewRow();
newRow.ItemArray = values; // array of values
table.Rows.Add(newRow);

That way the you can parallelize the creation of your data without running into issues when you add it to the DataTable.


Looking at the source code for DataTable:

It contains various fields:

private readonly DataRowBuilder rowBuilder;
internal readonly RecordManager recordManager;

NewRow() calls NewRow(-1), and NewRow(int) modifies the state of those fields:

    internal DataRow NewRow(int record) {
if (-1 == record) {
record = NewRecord(-1);
}

rowBuilder._record = record; // here
DataRow row = NewRowFromBuilder( rowBuilder );
recordManager[record] = row; // here

if (dataSet != null)
DataSet.OnDataRowCreated( row );

return row;
}

...and there's much more that I haven't followed. But what's clear is that NewRow() does more than just return a new row - it modifies the state of the DataTable instance all over the place.

The documentation never said it was thread safe, but I would have guessed that because you still have to add the row to the table, NewRow didn't modify the DataTable. But I would be wrong, and it's definitely not thread safe.

Another flag is in the documentation for NewRow

After creating a DataRow, you can add it to the DataRowCollection, through the DataTable object's Rows property. When you use NewRow to create new rows, the rows must be added to or deleted from the data table before you call Clear.

It doesn't say what will happen if you call Clear() without adding or deleting a row created with NewRow(). An exception? Will I die? So I tried. I'm still here, but calling Clear() replaced all of the values in each row with DBNull.Value, further underscoring that the rows are not "disembodied" until they are added to the DataTable. They are part of its state.



Related Topics



Leave a reply



Submit