C++ Equivalent to Java's Blockingqueue

C++ Equivalent to Java's BlockingQueue

It isn't fixed size and it doesn't support timeouts but here is a simple implementation of a queue I had posted recently using C++ 2011 constructs:

#include <mutex>
#include <condition_variable>
#include <deque>

template <typename T>
class queue
{
private:
std::mutex d_mutex;
std::condition_variable d_condition;
std::deque<T> d_queue;
public:
void push(T const& value) {
{
std::unique_lock<std::mutex> lock(this->d_mutex);
d_queue.push_front(value);
}
this->d_condition.notify_one();
}
T pop() {
std::unique_lock<std::mutex> lock(this->d_mutex);
this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
T rc(std::move(this->d_queue.back()));
this->d_queue.pop_back();
return rc;
}
};

It should be trivial to extend and use a timed wait for popping. The main reason I haven't done it is that I'm not happy with the interface choices I have thought of so far.

Objective-C equivalent of Java's BlockingQueue?

You can simply spin off an NSOperation and post a notification when the data has come back (finished loading). Take a look at Dave Dribin's blog post on concurrency with NSOperation that shows how to encapsulate an NSURLConnection session:

http://www.dribin.org/dave/blog/archives/2009/05/05/concurrent_operations/

If you are not talking about accessing a web service or site where NSURLConnection is appropriate, you can instead use Cocoa Async Socket if it's straight TCP/IP or UDP:

http://code.google.com/p/cocoaasyncsocket/

Best Regards,

C++ Equivalent java.util.concurrent.ArrayBlockingQueue

Check out tbb::concurrent_bounded_queue from the Intel Threading Building Blocks (TBB).

(Disclaimer: I haven't actually had a chance to use it in a project yet, but I've been following TBB).

C#: need a blocking FIFO queue similar to Java's LinkedBlockingQueue

BlockingCollection can be used with any number of different types of collections. If you don't manually pass in a specific type of concurrent collection it will uses a ConcurrentQueue, meaning it will do exactly what you want. You can uses a concurrent stack type, or a concurrent priority queue if you want, which is why it uses a general name such as Blocking Collection, and not BlockingConcurrentQueue.

All of this is listed on the MSDN page for BlockingCollection if you don't want to take my word for it.

How to improve performance of a blocking queue written in C?

'm having a hard time implementing a queue that satisfies the last point while keeping acceptable performance. I'm using pthreads to do the synchronization and, in order to satisfy the FIFO requirement, I am relying on pthread_cond_wait and pthread_cond_broadcast functions.

In this case, when a thread adds to the queue it does a pthread_cond_broadcast() and wakes up all threads that were blocked waiting to fetch data from the empty queue; and (if there's lots of threads that were blocked waiting) this causes lots of CPU time to get wasted by thread switches and scheduler overhead; because each waiting thread unblocks, tries to acquire a mutex (and probably blocks and unblocks again while trying to get the mutex) then checks to see if it's next, and then blocks again if it isn't next.

To fix that; each thread needs its own separate condition variable. When a thread starts waiting for data from an empty queue it puts its condition variable on a "queue of waiting readers"; and when a thread adds data to the queue it takes the first condition variable from the "queue of waiting readers" and (if there is a waiter) does one pthread_cond_signal() (and not a broadcast) so that only one waiting thread is unblocked.

Note that the "queue of waiting reader's condition variables" can be a linked list of "struct waiter { struct waiter * next; pthread_cond_t condVar; }" structures; and these structures can be created and initialized when a thread is created and then continually recycled (until the thread terminates).

For "multiple writers" it's essentially the same problem with the same solution (and can re-use the same "struct waiter" created when the thread was created). When a thread needs to wait to add data to the queue it adds its condition variable to a "linked list of waiting writers" and when a thread finishes removing data from the queue it does one pthread_cond_signal() to unblock the next waiting writer.

Note that this should significantly improve performance when its under high contention (lots of waiting readers or lots of waiting writers); but the extra overhead of managing "queues of waiters" may also reduce performance under low contention (worst case is when there's regularly only one waiting thread, which is the best case for your current approach using pthread_cond_broadcast).

Which Java blocking queue is most efficient for single-producer single-consumer scenarios

Well, there really aren't too many options. Let me go through the listed subclasses:

DelayQueue, LinkedBlockingDeque, PriorityBlockingQueue, and SynchronousQueue are all made for special cases requiring extra functionality; they don't make sense in this scenario.

That leaves only ArrayBlockingQueue and LinkedBlockingQueue. If you know how to tell whether you need an ArrayList or a LinkedList, you can probably answer this one yourself.

Note that in LinkedBlockingQueue, "linked nodes are dynamically created upon each insertion"; this might tend to push you toward ArrayBlockingQueue.

Objective C to Java : NSOperationQueue?

The NSOperationQueue implements basically the same functionality as you can get from a ThreadPoolExecutor plus a BlockingQueue in Java.

That is it takes a collection of NSOperations (approximate to Runnabless in Java) and executes then asynchronously.

With the added bonus that the operation queue tunes itself to the current run-time conditions.

Deadlock with blocking queue and barrier in C++

I had run your code and I understand the problem. The problem is with "-2" option. When the two threads arrive to this point, your main thread already pushed another values to the queue. So, if your queue increased it's size between the time that your threads got "-2" value, and before they arrive to "-2" option, your code will stuck:
Thread 1: get -2.
Thread 2: get -2.
Thread main: push -1.
Thread main: push -1.
Thread 1: wait untill the whole queue will be empty.
Thread 2: wait untill the whole queue will be empty.

queue:
-1
-1

^ this in case that dim equals 1. In your code, dim equals 8, you don't want to see how it looks like..
To solve this, all I did was to disable the following loop:

for(int i=0;i<nt;i++){
queue.push(-2);
}

When this pard disable, the code run perfectly.
This is how I checked it:

std::mutex guarder;

// function executed by each thread
void f(int i, Queue<int> &q){
while(1){
// take a message from blocking queue
guarder.lock();
int j= q.pop();
guarder.unlock();
// if it is end of stream then exit
if (j==-1) break;
// if it is barrier, wait for other threads to reach it
if (j==-2){
// active wait! BAD, but anyway...
while(q.size() > 0){
;
}
}
else{
// random stuff
int x = 0;
for(int i=0;i<j;i++)
x += 4;
guarder.lock();
cout << x << std::endl;
guarder.unlock();
}
}
}

int main(){
Queue<int> queue; //blocking queue
vector<thread> tids; // thread pool
int nt = 2; // number of threads
int dim = 8; // number to control number of operations

// create thread pool, passing thread id and queue
for(int i=0;i<nt;i++)
tids.push_back(thread(f,i, std::ref(queue)));

for(int dist=1; dist<=dim; dist++){ // without this outer loop the program works fine

// push random number
for(int j=0;j<dist;j++){
queue.push(dist);
}

/*// push barrier code
for(int i=0;i<nt;i++){
queue.push(-2);
}*/

// active wait! BAD, but anyway...
while (queue.size()>0){
;
}
}
// push end of stream
for(int i=0;i<nt;i++)
queue.push(-1);
// join thread pool
for(int i=0;i<nt;i++){
tids[i].join();
}
return 0;
}

The result:

4
8
8
12
12
12
16
16
16
20
20
16
20
20
20
24
24
24
24
24
24
28
28
28
28
28
28
28
32
32
32
32
32
32
32
32

BTW, the stuck didn't occur because your "active wait" part. It is not good, but it cause other problems usually (like slowing down your system).



Related Topics



Leave a reply



Submit