C++ Work Queues with Blocking

c++ work queues with blocking

Well. That's really quite simple; You're rejecting the tasks posted!

template< typename Task >
void run_task(task task){
boost::unique_lock<boost::mutex> lock( mutex_ );
if(0 < available_) {
--available_;
io_service_.post(boost::bind(&tpool::wrap_task, this, boost::function< void() > ( task )));
}
}

Note that the lock "waits" until the mutex is not owned by a thread. This might already be the case, and possibly when available_ is already 0. Now the line

if(0 < available_) {

This line is simply the condition. It's not "magical" because you're holding the mutex_ locked. (The program doesn't even know that a relation exists between mutex_ and available_). So, if available_ <= 0 you will just skip posting the job.


Solution #1

You should use the io_service to queue for you. This is likely what you wanted to achieve in the first place. Instead of keeping track of "available" threads, io_service does the work for you. You control how many threads it may use, by running the io_service on as many threads. Simple.

Since io_service is already thread-safe, you can do without the lock.

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iostream>

// tpool class
// It's always closed. :glasses:
#ifndef __POOL_H
#define __POOL_H
class tpool {
public:
tpool( std::size_t tpool_size );
~tpool();

template<typename Task>
void run_task(Task task){
io_service_.post(task);
}
private:
// note the order of destruction of members
boost::asio::io_service io_service_;
boost::asio::io_service::work work_;

boost::thread_group threads_;
};

extern tpool dbpool;
#endif

#include <boost/asio/io_service.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
//#include "pool.h"

tpool::tpool(std::size_t tpool_size) : work_(io_service_) {
for (std::size_t i = 0; i < tpool_size; ++i)
{
threads_.create_thread(
boost::bind(&boost::asio::io_service::run, &io_service_)
);
}
}

tpool::~tpool() {
io_service_.stop();

try {
threads_.join_all();
}
catch(...) {}
}

void foo() { std::cout << __PRETTY_FUNCTION__ << "\n"; }
void bar() { std::cout << __PRETTY_FUNCTION__ << "\n"; }

int main() {
tpool dbpool(50);

dbpool.run_task(foo);
dbpool.run_task(bar);

boost::this_thread::sleep_for(boost::chrono::seconds(1));
}

For shutdown purposes, you will want to enable "clearing" the io_service::work object, otherwise your pool will never exit.


Solution #2

Don't use io_service, instead roll your own queue implementation with a condition variable to notify a worker thread of new work being posted. Again, the number of workers is determined by the number of threads in the group.

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

class thread_pool
{
private:
mutex mx;
condition_variable cv;

typedef function<void()> job_t;
std::deque<job_t> _queue;

thread_group pool;

boost::atomic_bool shutdown;
static void worker_thread(thread_pool& q)
{
while (auto job = q.dequeue())
(*job)();
}

public:
thread_pool() : shutdown(false) {
for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
pool.create_thread(bind(worker_thread, ref(*this)));
}

void enqueue(job_t job)
{
lock_guard<mutex> lk(mx);
_queue.push_back(std::move(job));

cv.notify_one();
}

optional<job_t> dequeue()
{
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;

cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));

if (_queue.empty())
return none;

auto job = std::move(_queue.front());
_queue.pop_front();

return std::move(job);
}

~thread_pool()
{
shutdown = true;
{
lock_guard<mutex> lk(mx);
cv.notify_all();
}

pool.join_all();
}
};

void the_work(int id)
{
std::cout << "worker " << id << " entered\n";

// no more synchronization; the pool size determines max concurrency
std::cout << "worker " << id << " start work\n";
this_thread::sleep_for(chrono::seconds(2));
std::cout << "worker " << id << " done\n";
}

int main()
{
thread_pool pool; // uses 1 thread per core

for (int i = 0; i < 10; ++i)
pool.enqueue(bind(the_work, i));
}

Non-busy blocking Queue Implementation in C

As requested, here is my solution.

#define QUEUESIZE 50

typedef struct
{
char q[QUEUESIZE][150];
int first;
int last;
int count;
sem_t *full;
sem_t *empty;
sem_t *excl;

} Queue;

void init_queue(Queue *q, sem_t *f,sem_t *e, sem_t *ee,)
{
q->first = 0;
q->last = QUEUESIZE-1;
q->count = 0;
q->full = f;
q->empty = e;
q->excl = ee;
}

void enqueue(Queue *q, char x[150])
{
sem_wait(q->empty);
sem_wait(q->excl);

q->last = (q->last+1) % QUEUESIZE;
strcpy(q->q[ q->last ],x);
q->count = q->count + 1;

sem_post(q->excl);
sem_post(q->full);
}

void dequeue(Queue *q,char *ptr)
{
sem_wait(q->full);
sem_wait(q->excl);

strcpy(ptr,q->q[ q->first]);
q->first = (q->first+1) % QUEUESIZE;
q->count = q->count - 1;

sem_post(q->excl);
sem_post(q->empty);
}

I initialize the semaphores as follows:

sem_init(full,1,0);
sem_init(empty,1,49);
sem_init(dequeue_excl,1,1);
sem_init(enqueue_excl,1,1);

How to improve performance of a blocking queue written in C?

'm having a hard time implementing a queue that satisfies the last point while keeping acceptable performance. I'm using pthreads to do the synchronization and, in order to satisfy the FIFO requirement, I am relying on pthread_cond_wait and pthread_cond_broadcast functions.

In this case, when a thread adds to the queue it does a pthread_cond_broadcast() and wakes up all threads that were blocked waiting to fetch data from the empty queue; and (if there's lots of threads that were blocked waiting) this causes lots of CPU time to get wasted by thread switches and scheduler overhead; because each waiting thread unblocks, tries to acquire a mutex (and probably blocks and unblocks again while trying to get the mutex) then checks to see if it's next, and then blocks again if it isn't next.

To fix that; each thread needs its own separate condition variable. When a thread starts waiting for data from an empty queue it puts its condition variable on a "queue of waiting readers"; and when a thread adds data to the queue it takes the first condition variable from the "queue of waiting readers" and (if there is a waiter) does one pthread_cond_signal() (and not a broadcast) so that only one waiting thread is unblocked.

Note that the "queue of waiting reader's condition variables" can be a linked list of "struct waiter { struct waiter * next; pthread_cond_t condVar; }" structures; and these structures can be created and initialized when a thread is created and then continually recycled (until the thread terminates).

For "multiple writers" it's essentially the same problem with the same solution (and can re-use the same "struct waiter" created when the thread was created). When a thread needs to wait to add data to the queue it adds its condition variable to a "linked list of waiting writers" and when a thread finishes removing data from the queue it does one pthread_cond_signal() to unblock the next waiting writer.

Note that this should significantly improve performance when its under high contention (lots of waiting readers or lots of waiting writers); but the extra overhead of managing "queues of waiters" may also reduce performance under low contention (worst case is when there's regularly only one waiting thread, which is the best case for your current approach using pthread_cond_broadcast).

Creating a Blocking Queue

See here:

What do I get from front() of empty std container?

Bad things happen if you call .front() on an empty container, better check .empty() first.

Try:

T pop() {
this->mutex_.lock();
T value;
if( !this->queue_.empty() )
{
value = this->queue_.front(); // undefined behavior if queue_ is empty
// may segfault, may throw, etc.
this->queue_.pop();
}
this->mutex_.unlock();
return value;
}

Note: Since atomic operations are important on this kind of queue, I'd recommend API changes:

bool pop(T &t);  // returns false if there was nothing to read.

Better yet, if you're actually using this where it matters, you probably want to mark items in use before deleting in case of failure.

bool peekAndMark(T &t);  // allows one "marked" item per thread
void deleteMarked(); // if an item is marked correctly, pops it.
void unmark(); // abandons the mark. (rollback)

How to create a blocking queue in C between different threads on Linux platform

You can use pthread_mutex_t. Receiving thread should callpthread_mutex_lock(&shared_mutex) on a shared (global) instance of pthread_mutex_t. This will block Receiving thread. When Sending thread calls pthread_mutex_unlock(&shared_mutex) the Receiving thread will unblock automatically.

Here's sample code.

pthread_mutex_t shared_mutex = PTHREAD_MUTEX_INITIALIZER;
int flag = 0;

int thread_send (void)
{
pthread_mutex_lock( &shared_mutex ); // make sure to lock before creating thread_receive
create_thread( thread_receive ); /* use pthread_create() in the real code */
sleep (10);
flag = 1;
pthread_mutex_unlock( &shared_mutex );
}
int thread_receive (void)
{
pthread_mutex_lock(&shared_mutex);
if(flag == 1)
{
printf ("the flag is set to 1");
}
pthread_mutex_unlock( &shared_mutex );
}

How to achieve lock-free, but blocking behavior?

If you're on Linux, look into using a Futex. It provides the performance of a non-locking implementation by using atomic operations rather than kernel calls like a mutex would, but should you need to set the process to idle because of some condition not being true (i.e., lock-contention), it will then make the appropriate kernel calls to put the process to sleep and wake it back up at a future event. It's basically like a very fast semaphore.

What is the best way to cancel a task that is in a blocking state?

Could the following work in your scenario?

Instead of spawning multiple threads and having them waiting in the queue, I would have a single thread in an infinite polling loop and having that one spawn a new thread when a new piede of work comes in. You can add a semaphore to limit the number of threads you create. Check sample code below, I've used a BlockingCollection instead of the RabbitMQ .

  public class QueueManager
{
public BlockingCollection<Work> blockingCollection = new BlockingCollection<Work>();
private const int _maxRunningTasks = 3;

static SemaphoreSlim _sem = new SemaphoreSlim(_maxRunningTasks);

public void Queue()
{
blockingCollection.Add(new Work());
}

public void Consume()
{
while (true)
{
Work work = blockingCollection.Take();

_sem.Wait();

Task t = Task.Factory.StartNew(work.DoWork);
}
}

public class Work
{
public void DoWork()
{
Thread.Sleep(5000);
_sem.Release();
Console.WriteLine("Finished work");
}
}
}

and my testing class

class Test
{
static void Main(string[] args)
{
Consumer c = new Consumer();
Task t = Task.Factory.StartNew(c.Consume);

c.Queue();
c.Queue();
c.Queue();
c.Queue();
c.Queue();

Thread.Sleep(1000);
Console.ReadLine();
}
}

Is there a awaitable queue in c++?

This is basically your standard thread-safe queue implementation, but instead of a condition_variable, you will have to use futures to coordinate the different threads. You can then co_await on the future returned by pop to become ready.

The queue's implementation will need to keep a list of the promises that correspond to the outstanding pop calls. In case that the queue is still full when poping, you can return a ready future immediately. You can use plain old std::mutex to synchronize concurrent access to the underlying data structures.

I don't know of any implementation that already does this, but it shouldn't be too hard to pull off. Note though that managing all the futures will introduce some additional overhead, so your queue will probably be slightly less efficient than the classic condition_variable-based approach.



Related Topics



Leave a reply



Submit