How to Create a Thread Pool Using Boost in C++

How to create a thread pool using boost in C++?

The process is pretty simple. First create an asio::io_service and a thread_group. Fill the thread_group with threads linked to the io_service. Assign tasks to the threads using the boost::bind function.

To stop the threads (usually when you are exiting your program) just stop the io_service and join all threads.

You should only need these headers:

#include <boost/asio/io_service.hpp>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>

here is an example:

/*
* Create an asio::io_service and a thread_group (through pool in essence)
*/
boost::asio::io_service ioService;
boost::thread_group threadpool;


/*
* This will start the ioService processing loop. All tasks
* assigned with ioService.post() will start executing.
*/
boost::asio::io_service::work work(ioService);

/*
* This will add 2 threads to the thread pool. (You could just put it in a for loop)
*/
threadpool.create_thread(
boost::bind(&boost::asio::io_service::run, &ioService)
);
threadpool.create_thread(
boost::bind(&boost::asio::io_service::run, &ioService)
);

/*
* This will assign tasks to the thread pool.
* More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions"
*/
ioService.post(boost::bind(myTask, "Hello World!"));
ioService.post(boost::bind(clearCache, "./cache"));
ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit"));

/*
* This will stop the ioService processing loop. Any tasks
* you add behind this point will not execute.
*/
ioService.stop();

/*
* Will wait till all the threads in the thread pool are finished with
* their assigned tasks and 'join' them. Just assume the threads inside
* the threadpool will be destroyed by this method.
*/
threadpool.join_all();

Source: Recipes < Asio

C++ thread pool using boost::asio::thread_pool, why can't I reuse my threads?

You join the pool after posting the first task. So, the pool stops before you even accept a second task. That explains why you're not seeing more.

This fixes that:

for (size_t i = 0; i != 50; ++i) {
post(g_pool, boost::bind(f, 10 * i));
}
g_pool.join();

Addendum #1

In response to the comments. In case you want to wait for the outcome of a specific task, consider a future:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <future>

boost::asio::thread_pool g_pool(10);

int f(int i) {
std::cout << '(' + std::to_string(i) + ')';
return i * i;
}

int main() {
std::cout << std::unitbuf;
std::future<int> answer;

for (size_t i = 0; i != 50; ++i) {
auto task = boost::bind(f, 10 * i);
if (i == 42) {
answer = post(g_pool, std::packaged_task<int()>(task));
} else
{
post(g_pool, task);
}
}

answer.wait(); // optionally make sure it is ready before blocking get()
std::cout << "\n[Answer to #42: " + std::to_string(answer.get()) + "]\n";

// wait for remaining tasks
g_pool.join();
}

With one possible output:

(0)(50)(30)(90)(110)(100)(120)(130)(140)(150)(160)(170)(180)(190)(40)(200)(210)(220)(240)(250)(70)(260)(20)(230)(10)(290)(80)(270)(300)(340)(350)(310)(360)(370)(380)(330)(400)(410)(430)(60)(420)(470)(440)(490)(480)(320)(460)(450)(390)
[Answer to #42: 176400]
(280)

Addendum #2: Serializing tasks

If you want to serialize specific tasks, you can use a strand. E.g. to serialize all the request based on the remainder of the parameter modulo 3:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <future>

boost::asio::thread_pool g_pool(10);

int f(int i) {
std::cout << '(' + std::to_string(i) + ')';
return i * i;
}

int main() {
std::cout << std::unitbuf;

std::array strands{make_strand(g_pool.get_executor()),
make_strand(g_pool.get_executor()),
make_strand(g_pool.get_executor())};

for (size_t i = 0; i != 50; ++i) {
post(strands.at(i % 3), boost::bind(f, i));
}

g_pool.join();
}

With a possible output:

(0)(3)(6)(2)(9)(1)(5)(8)(11)(4)(7)(10)(13)(16)(19)(22)(25)(28)(31)(34)(37)(40)(43)(46)(49)(12)(15)(14)(18)(21)(24)(27)(30)(33)(36)(39)(42)(45)(48)(17)(20)(23)(26)(29)(32)(35)(38)(41)(44)(47)

Note that all work is done on any thread, but tasks on a strand happen in the order in which they were posted. So,

  • 0, 3, 6, 9, 12...
  • 1, 4, 7, 10, 13...
  • 2, 5, 8, 11, 14...

happen strictly serially, though

  • 4 and 7 don't need to happen on the same physical thread
  • 11 might happen before 4, because they're not on the same strand

Even More

In case you need more "barrier-like" synchronization, or what's known as fork-join semantics, see Boost asio thread_pool join does not wait for tasks to be finished (where I posted two answers, one after I discovered the fork-join executor example).

Creating a thread pool using boost

There is an unofficial (yet) threadpool in boost.
But it's not a problem to implement one yourself especially if great genericity is not a primary goal. Idea: your threadpool can be parametrized with TaskType type and the number of workers. The TP must be given the handler function which takes TaskType. TP contains a queue of added tasks. The real thread function just takes a task from the queue and calls the passed handler. Something like that.

Use Futures with Boost Thread Pool

The operations posted to the pool end without the threads ending. That's the whole purpose of pooling the threads.

void send_file(std::string const& file_path){
post(pool_, [this, &file_path] {
handle_send_file(file_path);
});
// DO SOMETHING WHEN handle_send_file ENDS
}

This has several issues. The largest one is that you should not capture file_path by reference, as the argument is soon out of scope, and the handle_send_file call will run at an unspecified time in another thread. That's a race condition and dangling reference. Undefined Behaviour results.

Then the

    // DO SOMETHING WHEN handle_send_file ENDS

is on a line which has no sequence relation with handle_send_file. In fact, it will probably run before that operation ever has a chance to start.

Simplifying

Here's a simplified version:

#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <fstream>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;

static asio::thread_pool pool_;

struct X {
std::unique_ptr<tcp::socket> socket_;

explicit X(unsigned short port) : socket_(new tcp::socket{ pool_ }) {
socket_->connect({ {}, port });
}

asio::thread_pool pool_;
std::unique_ptr<tcp::socket> socket_{ new tcp::socket{ pool_ } };

void send_file(std::string file_path) {
post(pool_, [=, this] {
send_file_implementation(file_path);
// DO SOMETHING WHEN send_file_implementation ENDS
});
}

// throws system_error exception
void send_file_implementation(std::string file_path) {
std::ifstream source_file(file_path,
std::ios_base::binary | std::ios_base::ate);
size_t file_size = source_file.tellg();
source_file.seekg(0);

write(*socket_,
asio::buffer(file_path + "\n" + std::to_string(file_size) + "\n\n"));

boost::array<char, 1024> buf{};
while (source_file.read(buf.c_array(), buf.size()) ||
source_file.gcount() > 0)
{
int n = source_file.gcount();

if (n <= 0) {
using namespace boost::system;
throw system_error(errc::io_error, system_category());
}

write(*socket_, asio::buffer(buf), asio::transfer_exactly(n));
}
}
};

Now, you can indeed run several of these operations in parallel (assuming several instances of X, so you have separate socket_ connections).

To do something at the end, just put code where I moved the comment:

// DO SOMETHING WHEN send_file_implementation ENDS

If you don't know what to do there and you wish to make a future ready at that point, you can:

std::future<void> send_file(std::string file_path) {
std::packaged_task<void()> task([=, this] {
send_file_implementation(file_path);
});

return post(pool_, std::move(task));
}

This overload of post magically¹ returns the future from the packaged task. That packaged task will set the internal promise with either the (void) return value or the exception thrown.

See it in action: Live On Coliru

int main() {
// send two files simultaneously to different connections
X clientA(6868);
X clientB(6969);

std::future<void> futures[] = {
clientA.send_file("main.cpp"),
clientB.send_file("main.cpp"),
};

for (auto& fut : futures) try {
fut.get();
std::cout << "Everything completed without error\n";
} catch(std::exception const& e) {
std::cout << "Error occurred: " << e.what() << "\n";
};

pool_.join();
}

I tested this while running two netcats to listen on 6868/6969:

nc -l -p 6868 | head& nc -l -p 6969 | md5sum&
./a.out
wait

The server prints:

Everything completed without error
Everything completed without error

The netcats print their filtered output:

main.cpp
1907

#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <fstream>
#include <iostream>
#include <future>
namespace asio = boost::asio;
using asio::ip::tcp;
7ecb71992bcbc22bda44d78ad3e2a5ef -

¹ not magic: see https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/async_result.html

Using boost::asio thread pool for general purpose tasks

Boost.Asio is not solely for network programming, see the reference documentation. It has extensive support for things like

  • time based operations (deadline_timer)
  • signal handling
  • platform specific operations such as posix streams and Windows handles

I've used it for other purposes in several applications as well. One example being a thread pool to service potentially long running blocking database operations while providing an asynchronous interface for the application. Boost.Asio really is a very powerful library. Using it for a general purpose thread pool as you propose can work just fine.

Thread pool using boost asio

In short, you need to wrap the user's provided task with another function that will:

  • Invoke the user function or callable object.
  • Lock the mutex and decrement the counter.

I may not be understanding all the requirements for this thread pool. Thus, for clarity, here is an explicit list as to what I believe are the requirements:

  • The pool manages the lifetime of the threads. The user should not be able to delete threads that reside within the pool.
  • The user can assign a task to the pool in a non-intrusive way.
  • When a task is being assigned, if all threads in the pool are currently running other tasks, then the task is discarded.

Before I provide an implementation, there are a few key points I would like to stress:

  • Once a thread has been launched, it will run until completion, cancellation, or termination. The function the thread is executing cannot be reassigned. To allow for a single thread to execute multiple functions over the course of its life, the thread will want to launch with a function that will read from a queue, such as io_service::run(), and callable types are posted into the event queue, such as from io_service::post().
  • io_service::run() returns if there is no work pending in the io_service, the io_service is stopped, or an exception is thrown from a handler that the thread was running. To prevent io_serivce::run() from returning when there is no unfinished work, the io_service::work class can be used.
  • Defining the task's type requirements (i.e. the task's type must be callable by object() syntax) instead of requiring a type (i.e. task must inherit from process), provides more flexibility to the user. It allows the user to supply a task as a function pointer or a type providing a nullary operator().

Implementation using boost::asio:

#include <boost/asio.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
boost::asio::io_service io_service_;
boost::asio::io_service::work work_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
public:

/// @brief Constructor.
thread_pool( std::size_t pool_size )
: work_( io_service_ ),
available_( pool_size )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &boost::asio::io_service::run,
&io_service_ ) );
}
}

/// @brief Destructor.
~thread_pool()
{
// Force all threads to return from io_service::run().
io_service_.stop();

// Suppress all exceptions.
try
{
threads_.join_all();
}
catch ( const std::exception& ) {}
}

/// @brief Adds a task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );

// If no threads are available, then return.
if ( 0 == available_ ) return;

// Decrement count, indicating thread is no longer available.
--available_;

// Post a wrapped task into the queue.
io_service_.post( boost::bind( &thread_pool::wrap_task, this,
boost::function< void() >( task ) ) );
}

private:
/// @brief Wrap a task so that the available count can be increased once
/// the user provided task has completed.
void wrap_task( boost::function< void() > task )
{
// Run the user supplied task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}

// Task has finished, so increment count of available threads.
boost::unique_lock< boost::mutex > lock( mutex_ );
++available_;
}
};

A few comments about the implementation:

  • Exception handling needs to occur around the user's task. If the user's function or callable object throws an exception that is not of type boost::thread_interrupted, then std::terminate() is called. This is the the result of Boost.Thread's exceptions in thread functions behavior. It is also worth reading Boost.Asio's effect of exceptions thrown from handlers.
  • If the user provides the task via boost::bind, then the nested boost::bind will fail to compile. One of the following options is required:

    • Not support task created by boost::bind.
    • Meta-programming to perform compile-time branching based on whether or not the user's type if the result of boost::bind so that boost::protect could be used, as boost::protect only functions properly on certain function objects.
    • Use another type to pass the task object indirectly. I opted to use boost::function for readability at the cost of losing the exact type. boost::tuple, while slightly less readable, could also be used to preserve the exact type, as seen in the Boost.Asio's serialization example.

Application code can now use the thread_pool type non-intrusively:

void work() {};

struct worker
{
void operator()() {};
};

void more_work( int ) {};

int main()
{
thread_pool pool( 2 );
pool.run_task( work ); // Function pointer.
pool.run_task( worker() ); // Callable object.
pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.
}

The thread_pool could be created without Boost.Asio, and may be slightly easier for maintainers, as they no longer need to know about Boost.Asio behaviors, such as when does io_service::run() return, and what is the io_service::work object:

#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
std::queue< boost::function< void() > > tasks_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
boost::condition_variable condition_;
bool running_;
public:

/// @brief Constructor.
thread_pool( std::size_t pool_size )
: available_( pool_size ),
running_( true )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
}
}

/// @brief Destructor.
~thread_pool()
{
// Set running flag to false then notify all threads.
{
boost::unique_lock< boost::mutex > lock( mutex_ );
running_ = false;
condition_.notify_all();
}

try
{
threads_.join_all();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}

/// @brief Add task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );

// If no threads are available, then return.
if ( 0 == available_ ) return;

// Decrement count, indicating thread is no longer available.
--available_;

// Set task and signal condition variable so that a worker thread will
// wake up andl use the task.
tasks_.push( boost::function< void() >( task ) );
condition_.notify_one();
}

private:
/// @brief Entry point for pool threads.
void pool_main()
{
while( running_ )
{
// Wait on condition variable while the task is empty and the pool is
// still running.
boost::unique_lock< boost::mutex > lock( mutex_ );
while ( tasks_.empty() && running_ )
{
condition_.wait( lock );
}
// If pool is no longer running, break out.
if ( !running_ ) break;

// Copy task locally and remove from the queue. This is done within
// its own scope so that the task object is destructed immediately
// after running the task. This is useful in the event that the
// function contains shared_ptr arguments bound via bind.
{
boost::function< void() > task = tasks_.front();
tasks_.pop();

lock.unlock();

// Run the task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}

// Task has finished, so increment count of available threads.
lock.lock();
++available_;
} // while running_
}
};

Boost asio thread_pool join does not wait for tasks to be finished

I just ran into this advanced executor example which is hidden from the documentation:

I realized just now that Asio comes with a fork_executor example which does exactly this: you can "group" tasks and join the executor (which represents that group) instead of the pool. I've missed this for the longest time since none of the executor examples are listed in the HTML documentation – sehe 21 mins ago

So without further ado, here's that sample applied to your question:

Live On Coliru

#define BOOST_BIND_NO_PLACEHOLDERS
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/ts/executor.hpp>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>

// A fixed-size thread pool used to implement fork/join semantics. Functions
// are scheduled using a simple FIFO queue. Implementing work stealing, or
// using a queue based on atomic operations, are left as tasks for the reader.
class fork_join_pool : public boost::asio::execution_context {
public:
// The constructor starts a thread pool with the specified number of
// threads. Note that the thread_count is not a fixed limit on the pool's
// concurrency. Additional threads may temporarily be added to the pool if
// they join a fork_executor.
explicit fork_join_pool(std::size_t thread_count = std::thread::hardware_concurrency()*2)
: use_count_(1), threads_(thread_count)
{
try {
// Ask each thread in the pool to dequeue and execute functions
// until it is time to shut down, i.e. the use count is zero.
for (thread_count_ = 0; thread_count_ < thread_count; ++thread_count_) {
boost::asio::dispatch(threads_, [&] {
std::unique_lock<std::mutex> lock(mutex_);
while (use_count_ > 0)
if (!execute_next(lock))
condition_.wait(lock);
});
}
} catch (...) {
stop_threads();
threads_.join();
throw;
}
}

// The destructor waits for the pool to finish executing functions.
~fork_join_pool() {
stop_threads();
threads_.join();
}

private:
friend class fork_executor;

// The base for all functions that are queued in the pool.
struct function_base {
std::shared_ptr<std::size_t> work_count_;
void (*execute_)(std::shared_ptr<function_base>& p);
};

// Execute the next function from the queue, if any. Returns true if a
// function was executed, and false if the queue was empty.
bool execute_next(std::unique_lock<std::mutex>& lock) {
if (queue_.empty())
return false;
auto p(queue_.front());
queue_.pop();
lock.unlock();
execute(lock, p);
return true;
}

// Execute a function and decrement the outstanding work.
void execute(std::unique_lock<std::mutex>& lock,
std::shared_ptr<function_base>& p) {
std::shared_ptr<std::size_t> work_count(std::move(p->work_count_));
try {
p->execute_(p);
lock.lock();
do_work_finished(work_count);
} catch (...) {
lock.lock();
do_work_finished(work_count);
throw;
}
}

// Increment outstanding work.
void
do_work_started(const std::shared_ptr<std::size_t>& work_count) noexcept {
if (++(*work_count) == 1)
++use_count_;
}

// Decrement outstanding work. Notify waiting threads if we run out.
void
do_work_finished(const std::shared_ptr<std::size_t>& work_count) noexcept {
if (--(*work_count) == 0) {
--use_count_;
condition_.notify_all();
}
}

// Dispatch a function, executing it immediately if the queue is already
// loaded. Otherwise adds the function to the queue and wakes a thread.
void do_dispatch(std::shared_ptr<function_base> p,
const std::shared_ptr<std::size_t>& work_count) {
std::unique_lock<std::mutex> lock(mutex_);
if (queue_.size() > thread_count_ * 16) {
do_work_started(work_count);
lock.unlock();
execute(lock, p);
} else {
queue_.push(p);
do_work_started(work_count);
condition_.notify_one();
}
}

// Add a function to the queue and wake a thread.
void do_post(std::shared_ptr<function_base> p,
const std::shared_ptr<std::size_t>& work_count) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(p);
do_work_started(work_count);
condition_.notify_one();
}

// Ask all threads to shut down.
void stop_threads() {
std::lock_guard<std::mutex> lock(mutex_);
--use_count_;
condition_.notify_all();
}

std::mutex mutex_;
std::condition_variable condition_;
std::queue<std::shared_ptr<function_base>> queue_;
std::size_t use_count_;
std::size_t thread_count_;
boost::asio::thread_pool threads_;
};

// A class that satisfies the Executor requirements. Every function or piece of
// work associated with a fork_executor is part of a single, joinable group.
class fork_executor {
public:
fork_executor(fork_join_pool& ctx)
: context_(ctx), work_count_(std::make_shared<std::size_t>(0)) {}

fork_join_pool& context() const noexcept { return context_; }

void on_work_started() const noexcept {
std::lock_guard<std::mutex> lock(context_.mutex_);
context_.do_work_started(work_count_);
}

void on_work_finished() const noexcept {
std::lock_guard<std::mutex> lock(context_.mutex_);
context_.do_work_finished(work_count_);
}

template <class Func, class Alloc>
void dispatch(Func&& f, const Alloc& a) const {
auto p(std::allocate_shared<exFun<Func>>(
typename std::allocator_traits<Alloc>::template rebind_alloc<char>(a),
std::move(f), work_count_));
context_.do_dispatch(p, work_count_);
}

template <class Func, class Alloc> void post(Func f, const Alloc& a) const {
auto p(std::allocate_shared<exFun<Func>>(
typename std::allocator_traits<Alloc>::template rebind_alloc<char>(a),
std::move(f), work_count_));
context_.do_post(p, work_count_);
}

template <class Func, class Alloc>
void defer(Func&& f, const Alloc& a) const {
post(std::forward<Func>(f), a);
}

friend bool operator==(const fork_executor& a, const fork_executor& b) noexcept {
return a.work_count_ == b.work_count_;
}

friend bool operator!=(const fork_executor& a, const fork_executor& b) noexcept {
return a.work_count_ != b.work_count_;
}

// Block until all work associated with the executor is complete. While it
// is waiting, the thread may be borrowed to execute functions from the
// queue.
void join() const {
std::unique_lock<std::mutex> lock(context_.mutex_);
while (*work_count_ > 0)
if (!context_.execute_next(lock))
context_.condition_.wait(lock);
}

private:
template <class Func> struct exFun : fork_join_pool::function_base {
explicit exFun(Func f, const std::shared_ptr<std::size_t>& w)
: function_(std::move(f)) {
work_count_ = w;
execute_ = [](std::shared_ptr<fork_join_pool::function_base>& p) {
Func tmp(std::move(static_cast<exFun*>(p.get())->function_));
p.reset();
tmp();
};
}

Func function_;
};

fork_join_pool& context_;
std::shared_ptr<std::size_t> work_count_;
};

// Helper class to automatically join a fork_executor when exiting a scope.
class join_guard {
public:
explicit join_guard(const fork_executor& ex) : ex_(ex) {}
join_guard(const join_guard&) = delete;
join_guard(join_guard&&) = delete;
~join_guard() { ex_.join(); }

private:
fork_executor ex_;
};

//------------------------------------------------------------------------------

#include <algorithm>
#include <iostream>
#include <random>
#include <vector>
#include <boost/bind.hpp>

static void foo(const uint64_t begin, uint64_t *result)
{
uint64_t prev[] = {begin, 0};
for (uint64_t i = 0; i < 1000000000; ++i) {
const auto tmp = (prev[0] + prev[1]) % 1000;
prev[1] = prev[0];
prev[0] = tmp;
}
*result = prev[0];
}

void batch(fork_join_pool &pool, const uint64_t (&a)[2])
{
uint64_t r[] = {0, 0};
{
fork_executor fork(pool);
join_guard join(fork);
boost::asio::post(fork, boost::bind(foo, a[0], &r[0]));
boost::asio::post(fork, boost::bind(foo, a[1], &r[1]));
// fork.join(); // or let join_guard destructor run
}
std::cerr << "foo(" << a[0] << "): " << r[0] << " foo(" << a[1] << "): " << r[1] << std::endl;
}

int main() {
fork_join_pool pool;

batch(pool, {2, 4});
batch(pool, {3, 5});
batch(pool, {7, 9});
}

Prints:



Related Topics



Leave a reply



Submit