Thread Pooling in C++11

C++11 Dynamic Threadpool

  1. Start with maximum number of threads a system can support:

    int Num_Threads =  thread::hardware_concurrency();
  2. For an efficient threadpool implementation, once threads are created according to Num_Threads, it's better not to create new ones, or destroy old ones (by joining). There will be performance penalty, might even make your application goes slower than the serial version.

    Each C++11 thread should be running in their function with an infinite loop, constantly waiting for new tasks to grab and run.

    Here is how to attach such function to the thread pool:

    int Num_Threads = thread::hardware_concurrency();
    vector<thread> Pool;
    for(int ii = 0; ii < Num_Threads; ii++)
    { Pool.push_back(thread(Infinite_loop_function));}
  3. The Infinite_loop_function

    This is a "while(true)" loop waiting for the task queue

    void The_Pool:: Infinite_loop_function()
    {
    while(true)
    {
    {
    unique_lock<mutex> lock(Queue_Mutex);

    condition.wait(lock, []{return !Queue.empty()});
    Job = Queue.front();
    Queue.pop();
    }
    Job(); // function<void()> type
    }
    };
  4. Make a function to add job to your Queue

    void The_Pool:: Add_Job(function<void()> New_Job)
    {
    {
    unique_lock<mutex> lock(Queue_Mutex);
    Queue.push(New_Job);
    }
    condition.notify_one();
    }
  5. Bind an arbitrary function to your Queue

    Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));

Once you integrate these ingredients, you have your own dynamic threading pool. These threads always run, waiting for jobs to do.

C++11: std::thread pooled?

Generally, std::thread should be a minimal wrapper around underlying system primitive. For example, if you're on pthread platform, you can test with the following program that no matter how many threads you create, they are all created with unique pthread_t ids (which implies they're created on the fly and not borrowed from a thread pool):

#include <assert.h>
#include <mutex>
#include <set>
#include <thread>
#include <vector>

#include <pthread.h>

int main() {
std::vector<std::thread> workers;
std::set<long long> thread_ids;
std::mutex m;
const int n = 1024;

for (int i = 0; i < n; ++i) {
workers.push_back(std::thread([&] {
std::lock_guard<std::mutex> lock(m);
thread_ids.insert(pthread_self());
}));
}
for (auto& worker : workers) {
worker.join();
}
assert(thread_ids.size() == n);

return 0;
}

So thread pools still make perfect sense. That said, I've seen a video where C++ committee members discussed thread pools with regard to std::async (IIRC), but I can't find it right now.

Implementing a simple, generic thread pool in C++11

So the hard part is that packaged_task<R()> is move-only, otherwise you could just toss it into a std::function<void()>, and run those in your threads.

There are a few ways around this.

First, ridiculously, use a packaged_task<void()> to store a packaged_task<R()>. I'd advise against this, but it does work. ;) (what is the signature of operator() on packaged_task<R()>? What is the required signature for the objects you pass to packaged_task<void()>?)

Second, wrap your packaged_task<R()> in a shared_ptr, capture that in a lambda with signature void(), store that in a std::function<void()>, and done. This has overhead costs, but probably less than the first solution.

Finally, write your own move-only function wrapper. For the signature void() it is short:

struct task {
template<class F,
class dF=std::decay_t<F>,
class=decltype( std::declval<dF&>()() )
>
task( F&& f ):
ptr(
new dF(std::forward<F>(f)),
[](void* ptr){ delete static_cast<dF*>(ptr); }
),
invoke([](void*ptr){
(*static_cast<dF*>(ptr))();
})
{}
void operator()()const{
invoke( ptr.get() );
}
task(task&&)=default;
task&operator=(task&&)=default;
task()=default;
~task()=default;
explicit operator bool()const{return static_cast<bool>(ptr);}
private:
std::unique_ptr<void, void(*)(void*)> ptr;
void(*invoke)(void*) = nullptr;
};

and simple. The above can store packaged_task<R()> for any type R, and invoke them later.

This has relatively minimal overhead -- it should be cheaper than std::function, at least the implementations I've seen -- except it does not do SBO (small buffer optimization) where it stores small function objects internally instead of on the heap.

You can improve the unique_ptr<> ptr container with a small buffer optimization if you want.



Related Topics



Leave a reply



Submit