Using Condition Variable in a Producer-Consumer Situation

Using condition variable in a producer-consumer situation

You have to use the same mutex to guard the queue as you use in the condition variable.

This should be all you need:

void consume()
{
while( !bStop )
{
boost::scoped_lock lock( mutexQ);
// Process data
while( messageQ.empty() ) // while - to guard agains spurious wakeups
{
condQ.wait( lock );

}
string s = messageQ.front();
messageQ.pop();
}
}

void produce()
{
int i = 0;

while(( !bStop ) && ( i < MESSAGE ))
{
stringstream out;
out << i;
string s = out.str();

boost::mutex::scoped_lock lock( mutexQ );
messageQ.push( s );
i++;
condQ.notify_one();
}
}

Is it possible to create the producer consumer problem with condition variables and signal instead of broadcast?

I am trying to run the producer-consumer problem while
using pthread_cond_signal() instead of pthread_cond_broadcast(),
however, I attempted a lot of things and can't seem to do it (if I
choose n producers and one consumer then the consumer finishes but not
all producers finish, some are stuck on full queue so the problem
never finishes executing.

Well that sounds eminently plausible. If you have multiple threads blocked on a CV, then one signal will wake one of them. The rest will remain blocked.

I am generally inclined to go the other way. If you use your CV correctly, then it is safe to always broadcast to it instead of signaling it, but doing the opposite exposes more area for possible bugs, especially when more than two threads are involved.

For the shutdown scenario in particular, I would recommend just using a broadcast. You need to wake potentially multiple threads, and that's exactly what pthread_cond_broadcast() is for. You could have the main thread do that instead of either consumer or producer if you wish. But if you insist on using only pthread_cond_signal() then you must be sure to call that function enough times to wake all threads that may be blocked on the CV. Again, some or all of those calls could be performed by the main thread.

Update

Notwithstanding my above recommendation to broadcast, a relatively good way to obtain clean shutdown with signaling only would be for each producer to signal the notFull CV before terminating. There are a couple of places you could put that, but I would probably do this, myself:

    if (*total_produced >= WORK_MAX) {
pthread_mutex_unlock(fifo->mutex); //if total work reached then exit
pthread_cond_signal(fifo->notFull); // other producers should wake up and exit, too
break;
}

Note that the mutex does not need to be currently locked by the thread that sends a broadcast or signal.

Note also that if you go this route then the consumer wants analogous treatment.

Finally, note that for this particular code, the transformation you are trying to perform is a bad idea from a functional perspective, especially for cases where the numbers of producers and consumers differ. No matter how many of each you start with, it will tend to reduce to having the same number of active producers and consumers, and probably, over a longer time frame, towards having at most one of each at any given time. These consequences arise from situations where multiple consumers or multiple producers are blocked on the CV at the same time.

C++ Producer-Consumer with condition variable

The problem is clearly fairness, as demonstrated by adding a line like

std::this_thread::sleep_for(0.01s);

just before the producer acquires the mutex.

Most operating systems will not give you any fairness guarantees on mutexes.

There are many ways of working around the issue of fairness. If you know how to handle this, you can stop here.

In your case, assuming your producers and consumers don't actually take half a second or more to put or take a job into the queue, and that the producer doesn't actually have an infinite amount of jobs, then you don't need to worry about this at all. If the operating system favours producers over consumers in a moment of high contention, the queue simply fills up (or all pending jobs eventually get put into the queue), forcing producers to wait, release the mutex, and allow consumers to take their turn.

Note that in a producer/consumer scenario with a maximum queue size you actually need two condition variables - for full and empty conditions.

Producer & consumer threads using condition variables

while (true) {

std::vector<std::string> responses;
responses.push_back(std::to_string(i));

qFull.notify_one();
}

You don't share the responses. How will the consumer ever see the responses vector? It will be empty at the start of each loop iteration.

Also

  • don't gratuitously use platform dependent code
  • don't gratuitously use dynamic allocation (you're leaking resources)
  • don't mix global variables with class local responsibilities

Next up: your conditions are such that after posting 1 workitems (Record) you will wait until the consumer has consumed 4000. I don't see how that works.

Why don't you just accumulate 4000 items on the producer side and then hand them to the consumer? That way, you could actually start benefitting from threading. Think about this: you had two threads: one is waiting for 4000 jobs to be created, the other is waiting for the first to completely empty the queue. What you have here is glorified sequential code with a lot of noise and unnecessary lock contention.

Bonus

Using my crystal ball here's a version that fixes most of the above (including lack of locking, accounting for spurious wake-up etc.).

You still need to fix the unsafe use of strcpy !!!

#include <string>
#include <iostream>
#include <deque>
#include <chrono>
#include <thread>

#include <boost/scoped_ptr.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>

static constexpr auto MIN_QUEUE_WORK = 10;
static constexpr auto MAX_QUEUE_WORK = 40; // 4000;

class A {
private:
boost::thread myProducerThread;
boost::thread myConsumerThread;

boost::mutex mutex_;
boost::condition_variable pushed_, popped_;

struct Record {
char response[128];

Record(const char *response) {
memset(this->response, 0, sizeof(this->response));
strcpy(this->response, response);
}

~Record() {}

Record &operator=(const Record &cmd) {
if (this == &cmd) // Same object?
{
return *this;
}

memset(this->response, 0, sizeof(this->response));
strcpy(this->response, cmd.response);
return *this;
}
};

typedef std::deque<Record> RecordsQueue;
RecordsQueue queue_;

public:
void RunThreads();
void RunProducer();
void RunConsumer();
};

void A::RunThreads() {
myProducerThread = boost::thread(&A::RunProducer, this);
myConsumerThread = boost::thread(&A::RunConsumer, this);

myProducerThread.join();
myConsumerThread.join();
}

void A::RunProducer() {
int i = 0;

while (i<1000) {
boost::mutex::scoped_lock lock(mutex_);
popped_.wait(lock, [this] { return queue_.size()<MAX_QUEUE_WORK; });

queue_.push_back(Record { std::to_string(i).c_str() });
std::cout << "Added: " << std::to_string(i) << " size: " << queue_.size() << std::endl;
i++;
pushed_.notify_one();
}
}

void A::RunConsumer() {
while (true) {
boost::mutex::scoped_lock lock(mutex_);
pushed_.wait(lock, [this]{return queue_.size()>MIN_QUEUE_WORK;});

Record res = queue_.front();
std::cout << "Processed: " << res.response << std::endl;
queue_.pop_front();

popped_.notify_one();
}
}

int main() {
A a;
a.RunThreads();
}

C++ Producer Consumer Problem with condition variable + mutex + pthreads

The same exact bug exists in both the consumer and the producer function. I'll explain one of them, and the same bug must also be fixed in the other one.

unsigned static int contador_pratos_consumidos = 0;
while (contador_pratos_consumidos < numero_pratos)
{

This static counter gets accessed and modified by multiple execution threads.

Any non-atomic object that's used by multiple execution threads must be properly sequenced (accessed only when holding an appropriate mutex).

If you focus your attention on the above two lines it should be obvious that this counter is accessed without the protection of any mutex. Once you realize that, the bug is obvious: at some point contador_pratos_consumidos will be exactly one less than numero_pratos. When that happens you can have multiple execution threads evaluating the while condition, at the same time, and all of them will happily conclude that it's true.

Multiple execution threads then enter the while loop. One will succeed in acquiring the mutex and consuming the "product", and finish. The remaining execution threads will wait forever, for another "product" that will never arrive. No more products will ever be produced. No soup for them.

The same bug also exists in the producer, except that the effects of the bug will be rather subtle: more products will end up being produced than there should be.

Of course, pedantically all of this is undefined behavior, so anything can really happen, but these are the typical, usual consequences this kind of undefined behavior. Both bugs must be fixed in order for this algorithm to work correctly.

Will consumer thread receive condition_variable notify signal if spuriously woken up

If the code that calls wait() isn't written right, it could, indeed, miss a wake up. But that would be a bit perverse. The usual idiom is:

lock the mutex
while the condition is not satisfied
wait on the condition variable

the signaling thread should lock the mutex before signaling:

lock the mutex
signal the condition variable

Waiting on the condition variable unlocks the mutex for the duration of the wait. When the wait call returns, though, the mutex will be locked. So on a spurious wake up, the waiting thread will hold the mutex until it resumes waiting. While the waiting thread holds the mutex, the signaling thread can't signal the condition variable. When the waiting thread actually waits, the mutex gets unlocked and the signaling thread can go ahead and signal; the waiting thread will get the signal, and once the signaling thread releases the mutex, the waiting thread will resume execution.

So, no, proper code won't miss a signal. But if the waiting thread releases the mutex and reacquires it in the course of checking its condition, the signal could occur before the waiting thread calls wait, and the signal will be lost. Don't do that. <g>



Related Topics



Leave a reply



Submit