C++11 Thread-Safe Queue

C++11 thread-safe queue

According to the standard condition_variables are allowed to wakeup spuriously, even if the event hasn't occured. In case of a spurious wakeup it will return cv_status::no_timeout (since it woke up instead of timing out), even though it hasn't been notified. The correct solution for this is of course to check if the wakeup was actually legit before proceding.

The details are specified in the standard §30.5.1 [thread.condition.condvar]:

—The function will unblock when signaled by a call to notify_one(), a call to notify_all(), expiration of the absolute timeout (30.2.4) specified by abs_time, or spuriously.

...

Returns: cv_status::timeout if the absolute timeout (30.2.4) specifiedby abs_time expired, other-ise cv_status::no_timeout.

Thread safe queue with front() + pop()

The usual solution is to provide a combined front & pop that accepts a reference into which to store the popped value, and returns a bool that is true if a value was popped:

bool pop(std::string& t) {
lock_guard<mutex> lock(d_mutex);

if (std::queue<std::string>::empty()) {
return false;
}

t = std::move(std::queue<std::string>::front());
std::queue<std::string>::pop();
return true;
}

Any exceptions thrown by the move assignment happen before the queue is modified, maintaining the exception guarantee provided by the value type's move assignment operator.

Why is std::queue not thread-safe?

Imagine you check for !queue.empty(), enter the next block and before getting to access queue.first(), another thread would remove (pop) the one and only element, so you query an empty queue.

Using a synchronized queue like the following

#pragma once

#include <queue>
#include <mutex>
#include <condition_variable>

template <typename T>
class SharedQueue
{
public:
SharedQueue();
~SharedQueue();

T& front();
void pop_front();

void push_back(const T& item);
void push_back(T&& item);

int size();
bool empty();

private:
std::deque<T> queue_;
std::mutex mutex_;
std::condition_variable cond_;
};

template <typename T>
SharedQueue<T>::SharedQueue(){}

template <typename T>
SharedQueue<T>::~SharedQueue(){}

template <typename T>
T& SharedQueue<T>::front()
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
return queue_.front();
}

template <typename T>
void SharedQueue<T>::pop_front()
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
queue_.pop_front();
}

template <typename T>
void SharedQueue<T>::push_back(const T& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push_back(item);
mlock.unlock(); // unlock before notificiation to minimize mutex con
cond_.notify_one(); // notify one waiting thread

}

template <typename T>
void SharedQueue<T>::push_back(T&& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push_back(std::move(item));
mlock.unlock(); // unlock before notificiation to minimize mutex con
cond_.notify_one(); // notify one waiting thread

}

template <typename T>
int SharedQueue<T>::size()
{
std::unique_lock<std::mutex> mlock(mutex_);
int size = queue_.size();
mlock.unlock();
return size;
}

The call to front() waits until it has an element and locks the underlying queue so only one thread may access it at a time.

ThreadSafe Queue c++

It doesn't mean the program isn't thread safe. It doesn't mean it's ill-defined and can crash.

It just means your program's logic is not written to add the items to the queue in any particular order.

If you want those two items to be added in a specific order, push both from one thread.

Thread safety doesn't mean your application runs as if it only had one thread.

Your program is working fine.

Thread safe queues and spurious wakes

Spurious wakes just mean you need to check that the condition for the wake remains valid when you are woken. Since the wait function is passed:

  1. A lock, for mutual exclusion, and
  2. A predicate to determine if the wait has been satisfied

the behavior when one thread is notified "normally", and another is notified spuriously is that one of them (doesn't matter which, whichever races faster) acquires the lock and confirms the queue is non-empty, then pops off the top element and releases the lock; the one that lost the race for the lock doesn't acquire the lock until the faster thread releases the lock, so it sees the already emptied queue and decides it was a spurious wakeup, going back to sleep.

Importantly, it doesn't really matter whether the spuriously woken thread won the race for the lock (and the queued item) or not; one of the threads behaved as if woken normally (it found the condition true and worked as expected), one as if woken spuriously (it found the condition false and went back to waiting, as expected), and the code as a whole behaved correctly.



Related Topics



Leave a reply



Submit