c++ work queues with blocking
Well. That's really quite simple; You're rejecting the tasks posted!
template< typename Task >
void run_task(task task){
boost::unique_lock<boost::mutex> lock( mutex_ );
if(0 < available_) {
--available_;
io_service_.post(boost::bind(&tpool::wrap_task, this, boost::function< void() > ( task )));
}
}
Note that the lock
"waits" until the mutex is not owned by a thread. This might already be the case, and possibly when available_
is already 0. Now the line
if(0 < available_) {
This line is simply the condition. It's not "magical" because you're holding the mutex_
locked. (The program doesn't even know that a relation exists between mutex_
and available_
). So, if available_ <= 0
you will just skip posting the job.
Solution #1
You should use the io_service
to queue for you. This is likely what you wanted to achieve in the first place. Instead of keeping track of "available" threads, io_service
does the work for you. You control how many threads it may use, by running the io_service
on as many threads. Simple.
Since io_service
is already thread-safe, you can do without the lock.
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iostream>
// tpool class
// It's always closed. :glasses:
#ifndef __POOL_H
#define __POOL_H
class tpool {
public:
tpool( std::size_t tpool_size );
~tpool();
template<typename Task>
void run_task(Task task){
io_service_.post(task);
}
private:
// note the order of destruction of members
boost::asio::io_service io_service_;
boost::asio::io_service::work work_;
boost::thread_group threads_;
};
extern tpool dbpool;
#endif
#include <boost/asio/io_service.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
//#include "pool.h"
tpool::tpool(std::size_t tpool_size) : work_(io_service_) {
for (std::size_t i = 0; i < tpool_size; ++i)
{
threads_.create_thread(
boost::bind(&boost::asio::io_service::run, &io_service_)
);
}
}
tpool::~tpool() {
io_service_.stop();
try {
threads_.join_all();
}
catch(...) {}
}
void foo() { std::cout << __PRETTY_FUNCTION__ << "\n"; }
void bar() { std::cout << __PRETTY_FUNCTION__ << "\n"; }
int main() {
tpool dbpool(50);
dbpool.run_task(foo);
dbpool.run_task(bar);
boost::this_thread::sleep_for(boost::chrono::seconds(1));
}
For shutdown purposes, you will want to enable "clearing" the io_service::work
object, otherwise your pool will never exit.
Solution #2
Don't use io_service
, instead roll your own queue implementation with a condition variable to notify a worker thread of new work being posted. Again, the number of workers is determined by the number of threads in the group.
#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>
using namespace boost;
using namespace boost::phoenix::arg_names;
class thread_pool
{
private:
mutex mx;
condition_variable cv;
typedef function<void()> job_t;
std::deque<job_t> _queue;
thread_group pool;
boost::atomic_bool shutdown;
static void worker_thread(thread_pool& q)
{
while (auto job = q.dequeue())
(*job)();
}
public:
thread_pool() : shutdown(false) {
for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
pool.create_thread(bind(worker_thread, ref(*this)));
}
void enqueue(job_t job)
{
lock_guard<mutex> lk(mx);
_queue.push_back(std::move(job));
cv.notify_one();
}
optional<job_t> dequeue()
{
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;
cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));
if (_queue.empty())
return none;
auto job = std::move(_queue.front());
_queue.pop_front();
return std::move(job);
}
~thread_pool()
{
shutdown = true;
{
lock_guard<mutex> lk(mx);
cv.notify_all();
}
pool.join_all();
}
};
void the_work(int id)
{
std::cout << "worker " << id << " entered\n";
// no more synchronization; the pool size determines max concurrency
std::cout << "worker " << id << " start work\n";
this_thread::sleep_for(chrono::seconds(2));
std::cout << "worker " << id << " done\n";
}
int main()
{
thread_pool pool; // uses 1 thread per core
for (int i = 0; i < 10; ++i)
pool.enqueue(bind(the_work, i));
}
Non-busy blocking Queue Implementation in C
As requested, here is my solution.
#define QUEUESIZE 50
typedef struct
{
char q[QUEUESIZE][150];
int first;
int last;
int count;
sem_t *full;
sem_t *empty;
sem_t *excl;
} Queue;
void init_queue(Queue *q, sem_t *f,sem_t *e, sem_t *ee,)
{
q->first = 0;
q->last = QUEUESIZE-1;
q->count = 0;
q->full = f;
q->empty = e;
q->excl = ee;
}
void enqueue(Queue *q, char x[150])
{
sem_wait(q->empty);
sem_wait(q->excl);
q->last = (q->last+1) % QUEUESIZE;
strcpy(q->q[ q->last ],x);
q->count = q->count + 1;
sem_post(q->excl);
sem_post(q->full);
}
void dequeue(Queue *q,char *ptr)
{
sem_wait(q->full);
sem_wait(q->excl);
strcpy(ptr,q->q[ q->first]);
q->first = (q->first+1) % QUEUESIZE;
q->count = q->count - 1;
sem_post(q->excl);
sem_post(q->empty);
}
I initialize the semaphores as follows:
sem_init(full,1,0);
sem_init(empty,1,49);
sem_init(dequeue_excl,1,1);
sem_init(enqueue_excl,1,1);
How to improve performance of a blocking queue written in C?
'm having a hard time implementing a queue that satisfies the last point while keeping acceptable performance. I'm using pthreads to do the synchronization and, in order to satisfy the FIFO requirement, I am relying on pthread_cond_wait and pthread_cond_broadcast functions.
In this case, when a thread adds to the queue it does a pthread_cond_broadcast()
and wakes up all threads that were blocked waiting to fetch data from the empty queue; and (if there's lots of threads that were blocked waiting) this causes lots of CPU time to get wasted by thread switches and scheduler overhead; because each waiting thread unblocks, tries to acquire a mutex (and probably blocks and unblocks again while trying to get the mutex) then checks to see if it's next, and then blocks again if it isn't next.
To fix that; each thread needs its own separate condition variable. When a thread starts waiting for data from an empty queue it puts its condition variable on a "queue of waiting readers"; and when a thread adds data to the queue it takes the first condition variable from the "queue of waiting readers" and (if there is a waiter) does one pthread_cond_signal()
(and not a broadcast) so that only one waiting thread is unblocked.
Note that the "queue of waiting reader's condition variables" can be a linked list of "struct waiter { struct waiter * next; pthread_cond_t condVar; }
" structures; and these structures can be created and initialized when a thread is created and then continually recycled (until the thread terminates).
For "multiple writers" it's essentially the same problem with the same solution (and can re-use the same "struct waiter
" created when the thread was created). When a thread needs to wait to add data to the queue it adds its condition variable to a "linked list of waiting writers" and when a thread finishes removing data from the queue it does one pthread_cond_signal()
to unblock the next waiting writer.
Note that this should significantly improve performance when its under high contention (lots of waiting readers or lots of waiting writers); but the extra overhead of managing "queues of waiters" may also reduce performance under low contention (worst case is when there's regularly only one waiting thread, which is the best case for your current approach using pthread_cond_broadcast
).
Creating a Blocking Queue
See here:
What do I get from front() of empty std container?
Bad things happen if you call .front()
on an empty container, better check .empty()
first.
Try:
T pop() {
this->mutex_.lock();
T value;
if( !this->queue_.empty() )
{
value = this->queue_.front(); // undefined behavior if queue_ is empty
// may segfault, may throw, etc.
this->queue_.pop();
}
this->mutex_.unlock();
return value;
}
Note: Since atomic operations are important on this kind of queue, I'd recommend API changes:
bool pop(T &t); // returns false if there was nothing to read.
Better yet, if you're actually using this where it matters, you probably want to mark items in use before deleting in case of failure.
bool peekAndMark(T &t); // allows one "marked" item per thread
void deleteMarked(); // if an item is marked correctly, pops it.
void unmark(); // abandons the mark. (rollback)
How to create a blocking queue in C between different threads on Linux platform
You can use pthread_mutex_t
. Receiving thread should callpthread_mutex_lock(&shared_mutex)
on a shared (global) instance of pthread_mutex_t. This will block Receiving thread. When Sending thread calls pthread_mutex_unlock(&shared_mutex)
the Receiving thread will unblock automatically.
Here's sample code.
pthread_mutex_t shared_mutex = PTHREAD_MUTEX_INITIALIZER;
int flag = 0;
int thread_send (void)
{
pthread_mutex_lock( &shared_mutex ); // make sure to lock before creating thread_receive
create_thread( thread_receive ); /* use pthread_create() in the real code */
sleep (10);
flag = 1;
pthread_mutex_unlock( &shared_mutex );
}
int thread_receive (void)
{
pthread_mutex_lock(&shared_mutex);
if(flag == 1)
{
printf ("the flag is set to 1");
}
pthread_mutex_unlock( &shared_mutex );
}
How to achieve lock-free, but blocking behavior?
If you're on Linux, look into using a Futex. It provides the performance of a non-locking implementation by using atomic operations rather than kernel calls like a mutex would, but should you need to set the process to idle because of some condition not being true (i.e., lock-contention), it will then make the appropriate kernel calls to put the process to sleep and wake it back up at a future event. It's basically like a very fast semaphore.
What is the best way to cancel a task that is in a blocking state?
Could the following work in your scenario?
Instead of spawning multiple threads and having them waiting in the queue, I would have a single thread in an infinite polling loop and having that one spawn a new thread when a new piede of work comes in. You can add a semaphore to limit the number of threads you create. Check sample code below, I've used a BlockingCollection instead of the RabbitMQ .
public class QueueManager
{
public BlockingCollection<Work> blockingCollection = new BlockingCollection<Work>();
private const int _maxRunningTasks = 3;
static SemaphoreSlim _sem = new SemaphoreSlim(_maxRunningTasks);
public void Queue()
{
blockingCollection.Add(new Work());
}
public void Consume()
{
while (true)
{
Work work = blockingCollection.Take();
_sem.Wait();
Task t = Task.Factory.StartNew(work.DoWork);
}
}
public class Work
{
public void DoWork()
{
Thread.Sleep(5000);
_sem.Release();
Console.WriteLine("Finished work");
}
}
}
and my testing class
class Test
{
static void Main(string[] args)
{
Consumer c = new Consumer();
Task t = Task.Factory.StartNew(c.Consume);
c.Queue();
c.Queue();
c.Queue();
c.Queue();
c.Queue();
Thread.Sleep(1000);
Console.ReadLine();
}
}
Is there a awaitable queue in c++?
This is basically your standard thread-safe queue implementation, but instead of a condition_variable
, you will have to use future
s to coordinate the different threads. You can then co_await
on the future returned by pop
to become ready.
The queue's implementation will need to keep a list of the promises that correspond to the outstanding pop
calls. In case that the queue is still full when pop
ing, you can return a ready future immediately. You can use plain old std::mutex
to synchronize concurrent access to the underlying data structures.
I don't know of any implementation that already does this, but it shouldn't be too hard to pull off. Note though that managing all the futures will introduce some additional overhead, so your queue will probably be slightly less efficient than the classic condition_variable
-based approach.
Related Topics
When Extending a Padded Struct, Why Can't Extra Fields Be Placed in the Tail Padding
How to Restart My Own Qt Application
Multiple Inheritance from Two Derived Classes
Is There Any Use for Local Function Declarations
About Thread-Safety of Weak_Ptr
Undefined Reference to Winmain (C++ Mingw)
Self-Unrolling MACro Loop in C/C++
Printf Rounding Behavior for Doubles
Image Scaling (Keepaspectratiobyexpanding) Through Opengl
Boost::Spirit Expression Parser
Boost Spirit X3 Cannot Compile Repeat Directive with Variable Factor
How to Get a Color Palette from an Image Using Opencv
Order of Calling Base Class Constructor from Derived Class Initialization List
Set All Bytes of Int to (Unsigned Char)0, Guaranteed to Represent Zero
General Rules of Passing/Returning Reference of Array (Not Pointer) To/From a Function