Producer Consumer Implementation in a Block Device Driver

Producer/Consumer in the kernel space - Linux

You can use Work Queues. Work Queues are simple, once you set up up your work queue, you use something like the following:

DECLARE_WORK(name, void (*function)(void *), void *data);

Your function call will be scheduled and called later, take a look at this article.

I also highly recommend you this book: Linux Device Drivers

edit: I just saw you already linked an SO post where they use work queues. Have you tried it out? You run into some issues? I suggest you start with an really simple example, just to try out if it's working. Implement your core functionality later.

Update:
From the official Documentation:

Some users depend on the strict execution ordering of ST wq. The
combination of @max_active of 1 and WQ_UNBOUND is used to achieve this
behavior. Work items on such wq are always queued to the unbound
worker-pools and only one work item can be active at any given time
thus achieving the same ordering property as ST wq.

That way you will have a guaranteed FIFO execution of your workers. But be aware that the work may be executed on different CPUs. You have to use memory barriers to ensure visibility (eg. wmb()).

Update:

As @user2009594 mentioned, a single threaded wq can be created using the following macro defined in linux/workqueue.h:

#define create_singlethread_workqueue(name) \
alloc_workqueue("%s", WQ_UNBOUND | WQ_MEM_RECLAIM, 1, (name)))

Implementing producer consumer in Java

Try replacing:

if(SharedQueue.queue.size() >= 5)

with:

while(SharedQueue.queue.size() >= 5)

and this:

if(SharedQueue.queue.size() == 0)

with:

while(SharedQueue.queue.size() == 0)

Just to re-check the condition after calling notify().

Producer-consumer based multi-threading for image processing

The problem mentioned in my question was that the image displayed by the Consumer thread was not containing complete data. The image displayed by the Consumer thread contains several patches which suggest that it could not get the full data produced by Producer thread.

ANSWER The reason behind it is the declaration of Mat image inside the while loop of Consumer thread. The Mat instance created inside the while loop gets deleted once the second round of while loop starts and therefore the Producer thread was never able to access the data of Mat image created in the Consumer thread.

SOLUTION: I should have done it something like this

struct ThreadSafeContainer
{
queue<Mat> safeContainer;

};

struct Producer
{
Producer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
{

}

void run()
{
while(true)
{
// grab image from camera
// store image in container
Mat image(400, 400, CV_8UC3, Scalar(10, 100,180) );

mu.lock();
container->safeContainer.push(Mat);
mu.unlock();
}
}

std::shared_ptr<ThreadSafeContainer> container;
};

struct Consumer
{
Consumer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
{

}
~Consumer()
{

}

void run()
{
while(true)
{
// read next image from container
mu.lock();
if (!container->safeContainer.empty())
{

Mat image= container->safeContainer.front(); //The front of the queue contain the image
container->safeContainer.pop();

imshow("consumer image", image);
waitKey(33);
}
mu.unlock();
}
}

std::shared_ptr<ThreadSafeContainer> container;
};

Synchronous task producer/consumer using ThreadPoolExecutor

I found another option than the one proposed by @Carlitos Way. It consists in directly adding tasks on the queue using BlockingQueue.offer. The only reason I did not manage to make it work at first and I had to post this question is that I did not know that the default behaviour of a ThreadPoolExecutor is to start without any thread. The threads will be created lazily using a thread factory and may be deleted/repopulated depending on the core and max sizes of the pool and the number of tasks being submitted concurrently.

Since the thread creation was lazy, my attempts to block on the call to offer failed because SynchronousQueue.offer immediately exits if nobody is waiting to get an element from the queue. Conversely, SynchronousQueue.put blocks until someone asks to take an item from the queue, which will never happen if the thread pool is empty.

Therefore, the workaround is to force the thread pool to create the core threads eagerly using ThreadPoolExecutor.prestartAllCoreThreads. My problem then becomes fairly trivial. I made a simplified version of my real use-case:

import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

public class SimplifiedBuildScheduler {
private static final int MAX_POOL_SIZE = 10;

private static final Random random = new Random();
private static final AtomicLong nextTaskId = new AtomicLong(0);

public static void main(String[] args) throws InterruptedException {
SynchronousQueue<Runnable> queue = new SynchronousQueue<>();

// this is a soft requirement in my system, not a real-time guarantee. See the complete semantics in my question.
long maxBuildTimeInMillis = 50;
// this timeout must be small compared to maxBuildTimeInMillis in order to accurately match the maximum build time
long taskSubmissionTimeoutInMillis = 1;

ThreadPoolExecutor pool = new ThreadPoolExecutor(MAX_POOL_SIZE, MAX_POOL_SIZE, 0, SECONDS, queue);
pool.prestartAllCoreThreads();

Runnable nextTask = makeTask(maxBuildTimeInMillis);

long millisAtStart = System.currentTimeMillis();
while (maxBuildTimeInMillis > System.currentTimeMillis() - millisAtStart) {
boolean submitted = queue.offer(nextTask, taskSubmissionTimeoutInMillis, MILLISECONDS);
if (submitted) {
nextTask = makeTask(maxBuildTimeInMillis);
} else {
System.out.println("Task " + nextTaskId.get() + " was not submitted. " + "It will be rescheduled unless " +
"the max build time has expired");
}
}

System.out.println("Max build time has expired. Stop submitting new tasks and running existing tasks to completion");

pool.shutdown();
pool.awaitTermination(9999999, SECONDS);
}

private static Runnable makeTask(long maxBuildTimeInMillis) {
long sleepTimeInMillis = randomSleepTime(maxBuildTimeInMillis);
long taskId = nextTaskId.getAndIncrement();
return () -> {
try {
System.out.println("Task " + taskId + " sleeping for " + sleepTimeInMillis + " ms");
Thread.sleep(sleepTimeInMillis);
System.out.println("Task " + taskId + " completed !");
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
};
}

private static int randomSleepTime(long maxBuildTimeInMillis) {
// voluntarily make it possible that a task finishes after the max build time is expired
return 1 + random.nextInt(2 * Math.toIntExact(maxBuildTimeInMillis));
}
}

An example of output is the following:

Task 1 was not submitted. It will be rescheduled unless the max build time has expired
Task 0 sleeping for 23 ms
Task 1 sleeping for 26 ms
Task 2 sleeping for 6 ms
Task 3 sleeping for 9 ms
Task 4 sleeping for 75 ms
Task 5 sleeping for 35 ms
Task 6 sleeping for 81 ms
Task 8 was not submitted. It will be rescheduled unless the max build time has expired
Task 8 was not submitted. It will be rescheduled unless the max build time has expired
Task 7 sleeping for 86 ms
Task 8 sleeping for 47 ms
Task 9 sleeping for 40 ms
Task 11 was not submitted. It will be rescheduled unless the max build time has expired
Task 2 completed !
Task 10 sleeping for 76 ms
Task 12 was not submitted. It will be rescheduled unless the max build time has expired
Task 3 completed !
Task 11 sleeping for 31 ms
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 0 completed !
Task 12 sleeping for 7 ms
Task 14 was not submitted. It will be rescheduled unless the max build time has expired
Task 14 was not submitted. It will be rescheduled unless the max build time has expired
Task 1 completed !
Task 13 sleeping for 40 ms
Task 15 was not submitted. It will be rescheduled unless the max build time has expired
Task 12 completed !
Task 14 sleeping for 93 ms
Task 16 was not submitted. It will be rescheduled unless the max build time has expired
Task 16 was not submitted. It will be rescheduled unless the max build time has expired
Task 16 was not submitted. It will be rescheduled unless the max build time has expired
Task 5 completed !
Task 15 sleeping for 20 ms
Task 17 was not submitted. It will be rescheduled unless the max build time has expired
Task 17 was not submitted. It will be rescheduled unless the max build time has expired
Task 11 completed !
Task 16 sleeping for 27 ms
Task 18 was not submitted. It will be rescheduled unless the max build time has expired
Task 18 was not submitted. It will be rescheduled unless the max build time has expired
Task 9 completed !
Task 17 sleeping for 95 ms
Task 19 was not submitted. It will be rescheduled unless the max build time has expired
Max build time has expired. Stop submitting new tasks and running existing tasks to completion
Task 8 completed !
Task 15 completed !
Task 13 completed !
Task 16 completed !
Task 4 completed !
Task 6 completed !
Task 10 completed !
Task 7 completed !
Task 14 completed !
Task 17 completed !

You'll notice, for example, that task 19 was not rescheduled because the max build time expired before the scheduler can attempt to offer it to the queue a second time. You can also see than all the ongoing tasks that started before the max build time expired do run to completion.

Note: As noted in my comments in the code, the max build time is a soft requirement, which means that it might not be met exactly, and my solution indeed allows for a task to be submitted even after the max build time is expired. This can happen if the call to offer starts just before the max build time expires and finishes after. To reduce the odds of it happening, it is important that the timeout used in the call to offer is much smaller than the max build time. In the real system, the thread pool is usually busy with no idle thread, therefore the probability of this race condition to occur is extremely small, and it has no bad consequence on the system when it does happen, since the max build time is a best effort attempt to meet an overall running time, not an exact and rigid constraint.

Producer Consumer bounded buffer using Semaphore

For a general-purpose, bounded, multi-producer/consumer blocking queue with semaphores, you need three of them. One to count the number of free spaces in the queue, (initialized to the LIMIT of the queue), one to count the number of items in the queue, (initialized to zero), and another to protect the queue from multiple access, (initialized to 1, to act as a mutex).

Pseudocode:

Producer: wait(free); wait(mutex); queue.push(newItem); send(mutex); send(items);

Consumer: wait(items); wait(mutex); result=(queue.pop); send(mutex); send(free); return result;

Boost: Block until queue has another item

You're looking to do an interesting operation: you're looking to do a blocking operation on a lockfree queue, which is kinda the opposite of what you have a lockfree queue for.

Use a normal blocking queue using a mutex and a condition variable. It's easy, and its a more standard way to do it.

You actually pay a performance penalty for lockfree in many cases, because you're guaranteeing that the queue does not hold any locks, even in the worst cases.

This question covers many of the pros and cons of both approaches.



Related Topics



Leave a reply



Submit