How to Use a Boost Condition Variable to Wait for a Thread to Complete Processing

How do I use a boost condition variable to wait for a thread to complete processing?

Yes, you are misusing the condition variable. "Condition variables" are really just the signaling mechanism. You also need to be testing a condition. In your case what might be happening is that the thread that is calling notify_one() actually completes before the thread that calls wait() even starts. (Or at least, the notify_one() call is happening before the wait() call.) This is called a "missed wakeup."

The solution is to actually have a variable which contains the condition you care about:

bool worker_is_done=false;

boost::mutex::scoped_lock lock(m_mutex);
while (!worker_is_done) m_condition.wait(lock);

and

boost::mutex::scoped_lock lock(m_mutex);
worker_is_done = true;
m_condition.notify_one();

If worker_is_done==true before the other thread even starts waiting then you'll just fall right through the while loop without ever calling wait().

This pattern is so common that I'd almost go so far as to say that if you don't have a while loop wrapping your condition_variable.wait() then you always have a bug. In fact, when C++11 adopted something similar to the boost::condtion_variable they added a new kind of wait() that takes a predicate lambda expression (essentially it does the while loop for you):

std::condition_variable cv;
std::mutex m;
bool worker_is_done=false;

std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return worker_is_done;});

dead-lock with condition_variable

The problem is that if timed_wait completes before notify_all is called it will then have to wait for the thread to release the mutex (i.e. after it has called notify_all) before it resumes then will call timed_wait again, the thread has finished so timed_wait will never succeed. There are two scenarios where this can happen, if your thread takes more than a millisecond to start (should be unlikely but the scheduling vagaries of your OS mean it could happen, especially if the CPU is busy) the other is spurious wakeups.

Both scenarios can be guarded against by setting a flag when calling notify_all which the waiting thread can check to ensure notify has been called:

#include <iostream>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>

static boost::mutex m_mutex;
static boost::condition_variable m_cond;

void threadFunc(bool& notified)
{
std::cout << "LOCKING MUTEX" << std::endl;
boost::mutex::scoped_lock lock(m_mutex);
std::cout << "LOCKED, NOTIFYING CONDITION" << std::endl;
notified = true;
m_cond.notify_all();
std::cout << "NOTIFIED" << std::endl;
}

int main(int argc, char* argv[])
{
while (true)
{
std::cout << "TESTING!!!" << std::endl;

boost::mutex::scoped_lock lock(m_mutex);

bool notified = false;

boost::thread thrd(&threadFunc, boost::ref(notified));

//m_cond.wait( lock );
std::cout << "WAITING..." << std::endl;
while (!m_cond.timed_wait(lock, boost::posix_time::milliseconds(1), [&] { return notified; }))
{
std::cout << "WAITING..." << std::endl;
}

static int pos = 0;
std::cout << "DONE!!! " << pos++ << std::endl;

thrd.join();
}

return 0;
}

Boost: Wait until worker thread waits on a condition variable

There's no reason to do that. When the worker thread is ready, it will do the work. It won't need to be triggered by a condition variable because it won't ever wait.

Waiting for something that has already happened is a coding error. If you might ever even think of doing that, you fundamentally don't understand condition variables.

Before a thread waits on a condition variable, it must make sure that there is something it needs to wait for. That thing must be protected by the mutex associated with the condition variable. And when the thread returns from the condition variable wait, it usually must re-test to see if it needs to wait again.

Resetting the conditional variable (boost)

Yes. The condition_variable is reset any time wait() is called. wait() blocks the current thread until the condition_variable is woken up so to speak.

You appear to be using the condition_variable incorrectly, however. Instead of saying

while (wait_for_conditional_variable_execute)

You really wanna say

while (thread_should_run)
{
// wait_for_conditional_variable_execute
cv.wait();
}

This would give you something to the following effect:

void processDataThread()
{
while (processData)
{
// Wait to be given data to process
cv.wait();
// Finished waiting, so retrieve data to process
int n = getData();
// Process data:
total += n;
}
}

Then in your main thread you'd have:

addData(16);
cv.notify_all();

Your thread will process the data, re-enter the while loop then wait for the condition_variable to be triggered. Once triggered (i.e. notify() is called) the thread will process the data, then wait again.

Deadlock with boost::condition_variable

There's a few things to point out on your code:

  1. You may wish to join your thread after calling shutdown, to ensure that your main thread doesn't finish before your other thread.
  2. m_queue.clear(); on shutdown is done outside of your m_mtxEvents mutex lock, meaning it's not as thread safe as you think it is.
  3. your 'thread safe processing' of the queue should be just taking an item off and then releasing the lock while you go off to process the event. You've not shown that explicitly, but failure to do so will result in the lock preventing items from being added.

The good news about a thread blocking like this, is that you can trivially break and inspect what the other threads are doing, and locate the one that is holding the lock. It might be that as per my comment #3 you're just taking a long time to process an event. On the other hand it may be that you've got a dead lock. In any case, what you need is to use your debugger to establish exactly what you've done wrong, since your sample doesn't have enough in it to demonstrate your problem.

Interupt boost thread that is already in condition variable wait call

I prefer to wait with timeouts, then check the return code of the wait call to see if it timed out or not. In fact I have a thread pattern I like to use that resolves this situation (and other common problems with threads in c++).

http://blog.chrisd.info/how-to-run-threads/

The main point for you is to not block infinitely in a thread, so your thread would look like this:

while (_running == true)
{
if (shared_mat_header->new_data_condition.wait_for(lock, boost::chrono::milliseconds(1)) == boost::cv_status::no_timeout)
{
// process data
}
}

Then in your destructor you set _running = false; and join the thread(s).

Using boost condition variables

As it has been pointed out, you must either make mStop atomic or guard all its accesses with the mutex. Forget about volatile, it's not relevant to your purposes.

Furthermore, when waiting on a condition variable a call to wait may return even if no notification functions were called (those are so-called spurious wake-ups). As such, calls to wait need to be guarded.

void LogWriter::stopThread()
{
{
boost::mutex::scoped_lock lock(mMutex);
mStop = true;
mCond.notify_one();
}
mThread->join();

}

void LogWriter::processLogEntry()
{
for(;;) {
boost::mutex::scoped_lock lock(mMutex);
// We wait as long as we're not told to stop and
// we don't have items to process
while(!mStop && q.empty()) mCond.wait(lock);

// Invariant: if we get here then
// mStop || !q.empty() holds

while(!q.empty())
{
// process begins
}

if(mStop) return;
}
}

How to make sure all slave threads are waited for conditional variable?

You have race condition - setting flag and notifying slave threads is not atomic. So you just have to lock data_ready_mutex before you are modifying data_ready flag in main thread. This will eliminate race condition, slave thread either will see data_ready false and go to wait on condition variable and will be notified, or it will acquire mutex lock only after data_ready is set to true and so it will not wait at all.

Boost condition deadlock using wait() in producer-consumer code

I did something similar recently even though mine uses the STL queue. See if you can pick out from my implementation. As wilx says, you need to wait on the condition. My implementation has maximum limit on the elements in the queue and I use that to wait for the mutex/guard to be freed.

I originally did this on Windows with ability to use Mutex or Critical sections in mind hence the template parameter which you can remove and use boost::mutex directly if it simplifies it for you.

#include <queue>
#include "Message.h"
#include <boost/thread/locks.hpp>
#include <boost/thread/condition.hpp>

template <typename T> class Queue : private boost::noncopyable
{
public:
// constructor binds the condition object to the Q mutex
Queue(T & mutex, size_t max_size) : m_max_size(max_size), m_mutex(mutex){}

// writes messages to end of Q
void put(const Message & msg)
{
// Lock mutex to ensure exclusive access to Q
boost::unique_lock<T> guard(m_mutex);

// while Q is full, sleep waiting until something is taken off of it
while (m_queue.size() == m_max_size)
{
cond.wait(guard);
}

// ok, room on the queue.
// Add the message to the queue
m_queue.push(msg);

// Indicate so data can be ready from Q
cond.notify_one();
}

// Read message from front of Q. Message is removed from the Q
Message get(void)
{
// Lock mutex to ensure exclusive access to Q
boost::unique_lock<T> guard(m_mutex);

// If Q is empty, sleep waiting for something to be put onto it
while (m_queue.empty())
{
cond.wait(guard);
}

// Q not empty anymore, read the value
Message msg = m_queue.front();

// Remove it from the queue
m_queue.pop();

// Signal so more data can be added to Q
cond.notify_one();

return msg;
}

size_t max_size(void) const
{
return m_max_size;
}

private:
const size_t m_max_size;
T & m_mutex;
std::queue<Message> m_queue;
boost::condition_variable_any cond;
};

This way, you can share the queue across the producer/consumer. Example usage

boost::mutex mutex;

Queue<boost::mutex> q(mutex, 100);

boost::thread_group threads;

threads.create_thread(Producer<boost::mutex>(q));
threads.create_thread(Consumer<boost::mutex>(q));

threads.join_all();

With Producer/Consumer defined as below

template <typename T> class Producer
{
public:
// Queue passed in
explicit Producer(Queue<T> &q) : m_queue(q) {}

void operator()()
{
}
}

boost asio asynchronously waiting on a condition variable

If I understand the intent correctly, you want to launch an event handler, when some condition variable is signaled, in context of asio thread pool? I think it would be sufficient to wait on the condition variable in the beginning of the handler, and io_service::post() itself back in the pool in the end, something of this sort:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
boost::asio::io_service io;
boost::mutex mx;
boost::condition_variable cv;
void handler()
{
boost::unique_lock<boost::mutex> lk(mx);
cv.wait(lk);
std::cout << "handler awakened\n";
io.post(handler);
}
void buzzer()
{
for(;;)
{
boost::this_thread::sleep(boost::posix_time::seconds(1));
boost::lock_guard<boost::mutex> lk(mx);
cv.notify_all();
}
}
int main()
{
io.post(handler);
boost::thread bt(buzzer);
io.run();
}


Related Topics



Leave a reply



Submit