C++11: Std::Thread Pooled

C++11: std::thread pooled?

Generally, std::thread should be a minimal wrapper around underlying system primitive. For example, if you're on pthread platform, you can test with the following program that no matter how many threads you create, they are all created with unique pthread_t ids (which implies they're created on the fly and not borrowed from a thread pool):

#include <assert.h>
#include <mutex>
#include <set>
#include <thread>
#include <vector>

#include <pthread.h>

int main() {
std::vector<std::thread> workers;
std::set<long long> thread_ids;
std::mutex m;
const int n = 1024;

for (int i = 0; i < n; ++i) {
workers.push_back(std::thread([&] {
std::lock_guard<std::mutex> lock(m);
thread_ids.insert(pthread_self());
}));
}
for (auto& worker : workers) {
worker.join();
}
assert(thread_ids.size() == n);

return 0;
}

So thread pools still make perfect sense. That said, I've seen a video where C++ committee members discussed thread pools with regard to std::async (IIRC), but I can't find it right now.

Implementing a simple, generic thread pool in C++11

So the hard part is that packaged_task<R()> is move-only, otherwise you could just toss it into a std::function<void()>, and run those in your threads.

There are a few ways around this.

First, ridiculously, use a packaged_task<void()> to store a packaged_task<R()>. I'd advise against this, but it does work. ;) (what is the signature of operator() on packaged_task<R()>? What is the required signature for the objects you pass to packaged_task<void()>?)

Second, wrap your packaged_task<R()> in a shared_ptr, capture that in a lambda with signature void(), store that in a std::function<void()>, and done. This has overhead costs, but probably less than the first solution.

Finally, write your own move-only function wrapper. For the signature void() it is short:

struct task {
template<class F,
class dF=std::decay_t<F>,
class=decltype( std::declval<dF&>()() )
>
task( F&& f ):
ptr(
new dF(std::forward<F>(f)),
[](void* ptr){ delete static_cast<dF*>(ptr); }
),
invoke([](void*ptr){
(*static_cast<dF*>(ptr))();
})
{}
void operator()()const{
invoke( ptr.get() );
}
task(task&&)=default;
task&operator=(task&&)=default;
task()=default;
~task()=default;
explicit operator bool()const{return static_cast<bool>(ptr);}
private:
std::unique_ptr<void, void(*)(void*)> ptr;
void(*invoke)(void*) = nullptr;
};

and simple. The above can store packaged_task<R()> for any type R, and invoke them later.

This has relatively minimal overhead -- it should be cheaper than std::function, at least the implementations I've seen -- except it does not do SBO (small buffer optimization) where it stores small function objects internally instead of on the heap.

You can improve the unique_ptr<> ptr container with a small buffer optimization if you want.

C++11 Dynamic Threadpool

  1. Start with maximum number of threads a system can support:

    int Num_Threads =  thread::hardware_concurrency();
  2. For an efficient threadpool implementation, once threads are created according to Num_Threads, it's better not to create new ones, or destroy old ones (by joining). There will be performance penalty, might even make your application goes slower than the serial version.

    Each C++11 thread should be running in their function with an infinite loop, constantly waiting for new tasks to grab and run.

    Here is how to attach such function to the thread pool:

    int Num_Threads = thread::hardware_concurrency();
    vector<thread> Pool;
    for(int ii = 0; ii < Num_Threads; ii++)
    { Pool.push_back(thread(Infinite_loop_function));}
  3. The Infinite_loop_function

    This is a "while(true)" loop waiting for the task queue

    void The_Pool:: Infinite_loop_function()
    {
    while(true)
    {
    {
    unique_lock<mutex> lock(Queue_Mutex);

    condition.wait(lock, []{return !Queue.empty()});
    Job = Queue.front();
    Queue.pop();
    }
    Job(); // function<void()> type
    }
    };
  4. Make a function to add job to your Queue

    void The_Pool:: Add_Job(function<void()> New_Job)
    {
    {
    unique_lock<mutex> lock(Queue_Mutex);
    Queue.push(New_Job);
    }
    condition.notify_one();
    }
  5. Bind an arbitrary function to your Queue

    Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));

Once you integrate these ingredients, you have your own dynamic threading pool. These threads always run, waiting for jobs to do.

Cannot correctly stop thread pool

First, a correct threadsafe queue.

template<class T>
struct threadsafe_queue {
[[nodiscard]] std::optional<T> pop() {
auto l = lock();
cv.wait(l, [&]{ return is_aborted() || !data.empty(); });
if (is_aborted())
return {};
auto r = std::move(data.front());
data.pop_front();
cv.notify_all(); // for wait_until_empty
return r; // might need std::move here, depending on compiler version
}
bool push(T t) {
auto l = lock();
if (is_aborted()) return false;
data.push_back(std::move(t));
cv.notify_one();
return true;
}
void set_abort_flag() {
auto l = lock(); // still need this
aborted = true;
data.clear();
cv.notify_all();
}
[[nodiscard]] bool is_aborted() const { return aborted; }
void wait_until_empty() {
auto l = lock();
cv.wait(l, [&]{ return data.empty(); });
}
private:
std::unique_lock<std::mutex> lock() {
return std::unique_lock<std::mutex>(m);
}
std::condition_variable cv;
std::mutex m;
std::atomic<bool> aborted{false};
std::deque<T> data;
};

this handles abort and the like internally.

Our threadpool then becomes:

struct threadpool {
explicit threadpool(std::size_t count)
{
for (std::size_t i = 0; i < count; ++i) {
threads.emplace_back([&]{
// abort handled by empty pop:
while( auto f = queue.pop() ) {
(*f)();
}
});
}
}
void set_abort_flag() {
queue.set_abort_flag();
}
[[nodiscard]] bool is_aborted() const {
return queue.is_aborted();
}
~threadpool() {
queue.wait_until_empty();
queue.set_abort_flag(); // get threads to leave the queue
for (std::thread& t:threads)
t.join();
}
template<class F,
class R=typename std::result_of<F()>::type
>
std::future<R> push_task( F f ) {
std::packaged_task<R()> task( std::move(f) );
auto ret = task.get_future();
if (queue.push( std::packaged_task<void()>(std::move(task)) )) // wait, this works? Yes it does.
return ret;
else
return {}; // cannot push, already aborted
}
private:
// yes, void. This is evil but it works
threadsafe_queue<std::packaged_task<void()>> queue;
std::vector<std::thread> threads;
};

in c++11 you can swap the std::optional for std::unique_ptr. More runtime overhead.

The trick here is that a std::packaged_task<void()> can store a std::packaged_task<R()>. And we don't need the return value in the queue. So one thread pool can handle any number of different return values in tasks -- it doesn't care.

I only join the threads on thread_pool destruction. I could do it after an abort as well.

Destroying a thread_pool waits until all tasks are complete. Note that aborting a thread_pool may not abort tasks in progress. One thing that you probably want to add is the option of passing an abort API/flag to the tasks, so they can abort early if asked.

Getting this industrial scale is hard, because ideally all blocking in a task would also pay attention to the abort possibility.

Live example.

You could add a 2nd cv to notify after pops, which only wait_until_empty waits on. That might safe you some spurious wakeups.



Related Topics



Leave a reply



Submit