Process Queue with Multithreading or Tasks

Process queue with multithreading or tasks

If you can use .Net 4.5, I'd suggest looking at Dataflow from the the Task Parallel Library (TPL).

That page leads to a lot of example walkthroughs such as How to: Implement a Producer-Consumer Dataflow Pattern and Walkthrough: Using Dataflow in a Windows Forms Application.

Have a look at that documentation to see if it would help you. It's quite a lot to take in, but I think it would probably be your best approach.

Alternatively, you could look into using a BlockingCollection along with its GetConsumingEnumerable() method to access items in the queue.

What you do is to split up the work into objects that you want to process in some way, and use a BlockingCollection to manage the queue.

Some sample code using ints rather than objects as the work items will help to demonstrate this:

When a worker thread has finished with it's current item, it will remove a new item from the work queue, process that item, then add it to the output queue.

A separate consumer thread removes completed items from the output queue and does something with them.

At the end we must wait for all the workers to finish (Task.WaitAll(workers)) before we can mark the output queue as completed (outputQueue.CompleteAdding()).

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
class Program
{
static void Main(string[] args)
{
new Program().run();
}

void run()
{
int threadCount = 4;
Task[] workers = new Task[threadCount];

Task.Factory.StartNew(consumer);

for (int i = 0; i < threadCount; ++i)
{
int workerId = i;
Task task = new Task(() => worker(workerId));
workers[i] = task;
task.Start();
}

for (int i = 0; i < 100; ++i)
{
Console.WriteLine("Queueing work item {0}", i);
inputQueue.Add(i);
Thread.Sleep(50);
}

Console.WriteLine("Stopping adding.");
inputQueue.CompleteAdding();
Task.WaitAll(workers);
outputQueue.CompleteAdding();
Console.WriteLine("Done.");

Console.ReadLine();
}

void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);

foreach (var workItem in inputQueue.GetConsumingEnumerable())
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem);
Thread.Sleep(100); // Simulate work.
outputQueue.Add(workItem); // Output completed item.
}

Console.WriteLine("Worker {0} is stopping.", workerId);
}

void consumer()
{
Console.WriteLine("Consumer is starting.");

foreach (var workItem in outputQueue.GetConsumingEnumerable())
{
Console.WriteLine("Consumer is using item {0}", workItem);
Thread.Sleep(25);
}

Console.WriteLine("Consumer is finished.");
}

BlockingCollection<int> inputQueue = new BlockingCollection<int>();
BlockingCollection<int> outputQueue = new BlockingCollection<int>();
}
}

What is the safest way to queue multiple threads originating in a loop?

Queues are one way to do it. The way to use them is to put function parameters on a queue, and use threads to get them and do the processing.

The queue size doesn't matter too much in this case because reading the next line is fast. In another case, a more optimized solution would be to set the queue size to at least twice the number of threads. That way if all threads finish processing an item from the queue at the same time, they will all have the next item in the queue ready to be processed.

To avoid complicating the code threads can be set as daemonic so that they don't stop the program from finishing after the processing is done. They will be terminated when the main process finishes.

The alternative is to put a special item on the queue (like None) for each thread and make the threads exit after getting it from the queue and then join the threads.

For the examples bellow the number of worker threads is set using the workers variable.

Here is an example of a solution using a queue.

from queue import Queue
from threading import Thread

queue = Queue(workers * 2)
def work():
while True:
myFunction(*queue.get())
queue.task_done()

for _ in range(workers):
Thread(target=work, daemon=True).start()

with open(targets, 'r') as listfile:
for line in listfile:
queue.put((line, param))
queue.join()

A simpler solution might be using ThreadPoolExecutor. It is especially simple in this case because the function being called doesn't return anything that needs to be used in the main thread.

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=workers) as executor:
with open(targets, 'r') as listfile:
for line in listfile:
executor.submit(myFunction, line, param)

Also, if it's not a problem to have all lines stored in memory, there is a solution which doesn't use anything other than threads. The work is split in such a way that the threads read some lines from a list and ignore other lines. A simple example with two threads is where one thread reads odd lines and the other reads even lines.

from threading import Thread

with open(targets, 'r') as listfile:
lines = listfile.readlines()

def work_split(n):
for line in lines[n::workers]:
myFunction(line, param)

threads = []
for n in range(workers):
t = Thread(target=work_split, args=(n,))
t.start()
threads.append(t)

for t in threads:
t.join()

I have done a quick benchmark and the Queue is slightly faster than the ThreadPoolExecutor, but the solution with the split work is faster than both.

ConcurrentQueue with multithreading

There are a couple of issues with your implementation. The first and obvious one is that the worker method only dequeues zero or one item and then stops:

    if(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}

It should be:

    while(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}

That however won't be enough to make your program work properly. If your workers are dequeueing faster than the main thread is enqueueing, they will stop while the main task is still enqueueing. You need to signal the workers that they can stop. You can define a boolean variable that will be set to true once enqueueing is done:

for (int i = 0; i < 100; i++)
{
iQ.Enqueue("Item" + i);
}
Volatile.Write(ref doneEnqueueing, true);

The workers will check the value:

void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
do {
string op;
while(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
SpinWait.SpinUntil(() => Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0));
}
while (!Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0))
Console.WriteLine("Worker {0} is stopping.", workerId);
}

Is there a way to change/add queue contents dynamically while processing tasks using threading in python

Update

With all your comments, it now appears that what you have is 10 independent sets of values creating 10 chains of dependencies:

Chain 1: [1, 11, 21, 31, ...]
Chain 2: [2, 12, 22, 32, ...]
...
Chain 10: [10, 20, 30, 40, ...]

You can run the first values from each chain as concurrent tasks in a thread pool (i.e. 1, 2, ... 10) and if a task completes successfully, then you can run the next value in the chain, else you are through with that chain since each successive value in a chain is only run on the successful completion of the previous value.

This becomes very simple once you have come up with your method of expressing these chains of dependencies:

from multiprocessing.pool import ThreadPool as Pool

def process_x_value(x):
"""
Process current x value.
Note that this is invoked by a simple call from run_dependency_chain,
which is already threaded.
This function must not be CPU-intensive or else you will not achieve any
level of concurrency using multithreading.
"""
import time
time.sleep(.1) # simulate some I/O
# return success or failure
return True # success

def run_dependency_chain(x):
"""
Process value x, if sucessful process next x value that was dependent
on successful completion.
Repeat until there is no next x value (end of dependency chain).
"""
while True:
result = process_x_value(x)
if not result: # failure
return
results[x] = True # just store successful results
x = next_x.get(x)
if x is None:
return

# we will be running 10 concurrent dependency chains:
# if task 1 completes successfully, next task to run is 11
# if task 2 completes successfully, next task to run is 12
# ...
# if task 10 completes successfully, next task to run is 20
"""
Thus the successor task can be computed by adding 10 to the current task,
but we will assume in general a more complicated relationship is possible. So we will
use a quasi-linked list of dependencies implemented using a dictionary, next_x,
where next_x[x] gives the successor x to be run on successful completion
of task x.
"""
# at most 2000 successful tasks:
next_x = {x: x + 10 for x in range(1, 1991)}

# to hold results, if you are interested:
results = {}
pool = Pool(10)
pool.map(run_dependency_chain, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
print(len(results)) # number of succesful results

Prints:

2000

If process_x_value is sufficiently I/O bound, multithreading should reduce your running time by a factor of almost 10.

Most efficient way to process a queue with threads

That looks reasonable. I've found BlockingCollection to be quite fast. I use it to process tens of thousands of requests per second.

If your application is processor bound, then you probably don't want to create more workers than you have cores. Certainly you don't want to create a lot more workers than cores. On a quad core machine, if you expect most of the time to be spent doing the FFTs, then four workers will eat all the CPU. More workers just means more that you have thread context switches to deal with. The TPL will typically balance that for you, but there's no reason to create, say, 100 workers when you can't handle more than a handful.

I would suggest that you run tests with 3, 4, 5, 6, 7, and 8 workers. See which one gives you the best throughput.

Multi-threaded queue consumer and task processing

You should generally* avoid direct thread programming in favor of the Task Parallel Library and concurrent collections built into .NET 4.0 and higher. Fortunately, the producer/consumer problem you described is common and Microsoft has a general-purpose tool for this: the BlockingCollection. This article has a good summary of its features. You may also refer to this white paper for performance analysis of the BlockingCollection<T> (among other things).

However, before pursuing the BlockingCollection<T> or an equivalent, given the scenario you described, why not go for the simple solution of using the Tasks. The TPL gives you the asynchronous execution of tasks with a lot of extras like cancellation and continuation. If, however, you need more advanced lifecycle management, then go for something like a BlockingCollection<T>.


* By "generally", I'm insinuating that the generic solution will not necessarily perform the best for your specific case as it's almost certain that a properly designed custom solution will be better. As with every decision, perform the cost/benefit analysis.



Related Topics



Leave a reply



Submit