Is There a Synchronization Class That Guarantee Fifo Order in C#

Is there a synchronization class that guarantee FIFO order in C#?

You'll need to write your own class to do this, I found this example (pasted because it looks as though the site's domain has lapsed):

using System.Threading;

public sealed class QueuedLock
{
private object innerLock;
private volatile int ticketsCount = 0;
private volatile int ticketToRide = 1;

public QueuedLock()
{
innerLock = new Object();
}

public void Enter()
{
int myTicket = Interlocked.Increment(ref ticketsCount);
Monitor.Enter(innerLock);
while (true)
{

if (myTicket == ticketToRide)
{
return;
}
else
{
Monitor.Wait(innerLock);
}
}
}

public void Exit()
{
Interlocked.Increment(ref ticketToRide);
Monitor.PulseAll(innerLock);
Monitor.Exit(innerLock);
}
}

Example of usage:

QueuedLock queuedLock = new QueuedLock();

try
{
queuedLock.Enter();
// here code which needs to be synchronized
// in correct order
}
finally
{
queuedLock.Exit();
}

Source via archive.org

Threads synchronization with FIFO order

You can also review the TPL Dataflow BufferBlock class for this purpose:

// Hand-off through a BufferBlock<T>
private static BufferBlock<int> m_buffer = new BufferBlock<int>();

// Producer
private static void Producer()
{
while(true)
{
int item = Produce();
// storing the messages in FIFO queue
m_buffer.Post(item);
}
}

// Consumer
private static async Task Consumer()
{
while(true)
{
int item = await m_buffer.ReceiveAsync();
Process(item);
}
}

// Main
public static void Main()
{
var p = Task.Factory.StartNew(Producer);
var c = Consumer();
Task.WaitAll(p,c);
}

Section of code with up to N threads executing in FIFO order

'If I combine a Semaphore and a ConcurrentQueue as follows thread may come back with a response to the request brought by other thread what would require significant changes in other parts of code.'

I hate to say it, but I would suggest 'changes in other parts of code', even though I don't know how much 'significance' this would have.

Typicaly, such a requirement is met as you suggested, by queueing messages that contain a reference to the originating class instance so that responses can be 'returned' to the object that requested them. If the originators are all descended from some 'messagehandler' class, that makes it easier on the thread that will call the function, (which should be a member of messagehandler). Once the thread/s have performed the function, they can call a 'onCompletion' method of the messagehandler. 'onCompletion' could either signal an event that the originator is waiting on, (synchronous), or queue something to a private P-C queue of the originator, (asynchronous).

So, a BlockingCollection, one consumer thread and judicious use of C++/C# inheritance/polymorphism should do the job.

Strangely, this is almost exactly what I am being forced into with my current embedded ARM project. The command-line interface thread used for config/debug/log is now so large that it needs a massive 600 words of stack, even in 'Thumb, Optimize of size' mode. It can no longer be permitted to call the SD filesystem directly and must now queue itself to the thread that runs the SD card, (which has the largest stack in the system to run FAT32), and wait on a semaphore for the SD thread to call its methods and signal the semaphore when done.

This is the classic way of ensuring that the calls are made sequentially and will cetainly work. It's basicaly a threadpool with only one thread.

Like the other posters have written, any other approach is likely to be, err.. 'brave'.

Trying to test a FIFO Mutex - it doesn't work if I start the testing threads in one loop, but it does work if I start them 1 ms apart

It seems (if I understood issue correctly) that you assume threads will actually start (DoWork will execute in this case) and grab mutex in the same order you called Thread.Start on them. However, this is not (necessary) the case.

Say you have 10 threads (with "ids" from 1 to 10), and you then call Thead.Start on them in order - that does not mean they will actually start in that order. You call start on thread 1, then you call start on thread 2, and then it's possible that DoWork of thread 2 (not 1) executes first. You can observe this by changing your test code this way:

public class FifoMutexTestUser {
private readonly int _id;
public FifoMutexTestUser(int id) {
_id = id;
}
private readonly static FifoMutex fifoMutex = new FifoMutex();

public void DoWork(object sleepTime)
{
Console.WriteLine("Thread started: " + _id);
try
{
fifoMutex.Enter();
Thread.Sleep((int)sleepTime);
FifoMutexTests.Queue.Enqueue(Thread.CurrentThread.Name);
}
finally
{
fifoMutex.Exit();
}
}
}

And then passing loop variable there (which is corellated with your threadNames on which you perform assert):

for (int i = 0; i < noOfThreads; i++)
{
FifoMutexTestUser user = new FifoMutexTestUser(i);
Thread newThread = new Thread(user.DoWork);
newThread.Name = threadNames[i];
newThread.Start(threadSleepTimes[i]);
}

And you can see something like this (results may vary of course):

Thread started: 9
Thread started: 1
Thread started: 0
Thread started: 2
Thread started: 3
Thread started: 4
Thread started: 5
Thread started: 6
Thread started: 7
Thread started: 8

So in this run, thread you called Thread.Start on last actually started first. But even more - if thread has started first (by started we mean here DoWork started executing) - it doesn't mean it would grab the mutex first, since threads execute in parallel and code outside of fifoMutex.Enter and fifoMutex.Exit (and inside those functions before and after mutex is actually acquired\released) is not protected by any synchronization constructs - any of the threads can grab the mutex first.

Adding a delay sometimes (not always) kind of gives the advantage to previous (in the loop) thread so it has more chance to actually grab mutex first. If you were lucky so that threads attempted to grab mutex in the right order, then your FifoMutex ensures they will then unblock in that order. But the order in which they grab mutex is undetermined in your test code.

Does lock() guarantee acquired in order requested?

IIRC, it's highly likely to be in that order, but it's not guaranteed. I believe there are at least theoretically cases where a thread will be woken spuriously, note that it still doesn't have the lock, and go to the back of the queue. It's possible that's only for Wait/Notify, but I have a sneaking suspicion it's for locking as well.

I definitely wouldn't rely on it - if you need things to occur in a sequence, build up a Queue<T> or something similar.

EDIT: I've just found this within Joe Duffy's Concurrent Programming on Windows which basically agrees:

Because monitors use kernel objects internally, they exhibit the same roughly-FIFO behavior that the OS synchronization mechanisms also exhibit (described in the previous chapter). Monitors are unfair, so if another thread tries to acquire the lock before an awakened waiting thread tries to acquire the lock, the sneaky thread is permitted to acquire a lock.

The "roughly-FIFO" bit is what I was thinking of before, and the "sneaky thread" bit is further evidence that you shouldn't make assumptions about FIFO ordering.

Thread order synchronization

It sounds like potentially your queue shouldn't contain individual tasks - but queues of tasks, where each subqueue is "all the tasks which share a sync-lock".

Your processor would therefore:

  • Take a subqueue off the main queue
  • Dequeue the first task off the subqueue and process it
  • When it's finished put the subqueue back at the end of the main queue (or anywhere, actually - work out how you want the scheduling to work)

This will ensure that only one task per subqueue is ever executed at a time.

You'll probably need a map from lock to subqueue, so that anything creating work can add it to the right subqueue. You'd need to atomically work out when to remove a subqueue from the map (and not put it back on the main queue), assuming you require that functionality at all.

EDIT: As an optimization for the above, you could put the subqueue itself into whatever you're using as the shared sync lock. It could have a reference to either "the single task to next execute" or "a queue of tasks" - only creating the queue lazily. You'd then put the sync lock (which wouldn't actually need to be used as a lock any more) on the queue, and each consumer would just ask it for the next task to execute. If only a single task is available, it's returned (and the "next task" variable set to null). If there are multiple tasks available, the first is dequeued.

When a producer adds a new task, either the "first task" variable is set to the task to execute if it was previously null, or a queue is created if there wasn't a queue but was already a task, or the queue is just added to if one already existed. That solves the inefficiency of unnecessary queue creation.

Again, the tricky part will be working out how to atomically throw away the shared resource lock - because you only want to do so after processing the last item, but equally you don't want to miss a task because you happened to add it at the wrong time. It shouldn't be too bad, but equally you'll need to think about it carefully.

TPL Producer Consumer in a FIFO order C#

OK after the edit - instead of adding the results in the BlockingCollection, add the Tasks in the blocking collection. This has the feature where the items are processed in order AND there is a maximum parallelism which will prevent too many threads from kicking off and you eating up all your memory.

https://dotnetfiddle.net/lUbSqB

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;

public class Program
{
private static BlockingCollection<Task<int>> BlockingCollection {get;set;}

public static void Producer(int numTasks)
{
Random r = new Random(7);
for(int i = 0 ; i < numTasks ; i++)
{
int closured = i;
Task<int> task = new Task<int>(()=>
{
Thread.Sleep(r.Next(100));
Console.WriteLine("Produced: " + closured);
return closured;
});
BlockingCollection.Add(task);
task.Start();
}
BlockingCollection.CompleteAdding();
}

public static void Main()
{
int numTasks = 20;
int maxParallelism = 3;

BlockingCollection = new BlockingCollection<Task<int>>(maxParallelism);

Task.Factory.StartNew(()=> Producer(numTasks));

foreach(var task in BlockingCollection.GetConsumingEnumerable())
{
task.Wait();
Console.WriteLine(" Consumed: "+ task.Result);
task.Dispose();
}

}
}

And the results:

Produced: 0
Consumed: 0
Produced: 1
Consumed: 1
Produced: 3
Produced: 2
Consumed: 2
Consumed: 3
Produced: 4
Consumed: 4
Produced: 6
Produced: 5
Consumed: 5
Consumed: 6
Produced: 7
Consumed: 7
Produced: 8
Consumed: 8
Produced: 10
Produced: 9
Consumed: 9
Consumed: 10
Produced: 12
Produced: 13
Produced: 11
Consumed: 11
Consumed: 12
Consumed: 13
Produced: 15
Produced: 14
Consumed: 14
Consumed: 15
Produced: 17
Produced: 16
Produced: 18
Consumed: 16
Consumed: 17
Consumed: 18
Produced: 19
Consumed: 19

Guaranteed order of processing in EventHandler subscriber

If multiple events are raised (rapidly), will the subscriber's handler always be called in the order the events were raised?

No. First, each call to BeginInvoke queues a work item to the thread pool; each work item executes on a different thread, and these threads are in a race. Second, even if your subscribers were invoked in the correct order, these invocations are still in a race; and if you take a lock inside a subscriber, the order in which the locks are granted is not defined.

If I implement a queued lock like the one mentioned above, can I rely on the Enter() method being called in the right order? (i.e. is there still a risk that "event 2" will reach the queuedLock.Enter() in my subscriber before "event 1", even if "event 2" fired after "event 1");

No, for the same reasons mentioned above.

Given the EventHandler needs to be async (to prevent subscribers from blocking the thread), is this just not possible/reasonable with an EventHandler and do I need to implement some sort of separate async event queue?

Yes. Since you have to process events in order, using a queue is preferred over multi-threading. There is no point in spawning multiple threads just to make them all wait to acquire a single lock.

Using a queue

When using a queue, the producer only enqueues an event without blocking. On the consumer side, there is a single thread that dequeues and handles events one by one. This is the thread that invokes the subscribers per each event. Note that the consumer thread will be blocked while the queue is empty.

You can still parallelize the processing

For example, if an event belongs to (let's say) a Customer, and the events from the same Customer must be processed in order, while two events from two different Customers can be processed independently.

In such a case, you can separate events belonging to different Customers into multiple queues, and have a separate consumer thread per queue. For this to work, you must ensure that events from the same Customer are mapped to the same queue.

For example, if you have N queues, you can map an event to a queue by calculating hash(Customer) modulo N.

Existing producer-consumer queues

.NET provides a couple of specialized queues, out of the box:

  • BlockingCollection
  • ConcurrentQueue

You may also take a look at:

  • TPL DataFlow that provides more performant alternatives.
  • LMAX Disruptor if you need an ultra-performant backend.

Are threads waiting on a lock FIFO?

Updated my answer:
They are queued, but the order is not guaranteed to be FIFO.

Check out this link: http://www.albahari.com/threading/part2.aspx

Good approach for running Tasks synchronously with FIFO?

After playing with various things I just found a simple solution, which should be sufficient for me and is somewhat similar to the solution of Matías Fidemraizer:

private static ConcurrentQueue<Task> Tasks { get; } = new ConcurrentQueue<Task>();

public async static Task RunAlone(this Task task)
{
Tasks.Enqueue(task);

do
{
var nextTask = Tasks.First();

if (nextTask == task)
{
nextTask.Start();
await nextTask;
Task deletingTask;
Tasks.TryDequeue(out deletingTask);
break;
}
else
{
nextTask.Wait();
}
} while (Tasks.Any());
}

public async static Task<TResult> RunAlone<TResult>(this Task<TResult> task)
{
TResult result = default(TResult);
Tasks.Enqueue(task);

do
{
var nextTask = Tasks.First();

if (nextTask == task)
{
nextTask.Start();
result = await (Task<TResult>)nextTask;
Task deletingTask;
Tasks.TryDequeue(out deletingTask);
break;
}
else
{
nextTask.Wait();
}
} while (Tasks.Any());

return result;
}


Related Topics



Leave a reply



Submit