Using Boost::Asio Thread Pool for General Purpose Tasks

Using boost::asio thread pool for general purpose tasks

Boost.Asio is not solely for network programming, see the reference documentation. It has extensive support for things like

  • time based operations (deadline_timer)
  • signal handling
  • platform specific operations such as posix streams and Windows handles

I've used it for other purposes in several applications as well. One example being a thread pool to service potentially long running blocking database operations while providing an asynchronous interface for the application. Boost.Asio really is a very powerful library. Using it for a general purpose thread pool as you propose can work just fine.

Thread pool using boost asio

In short, you need to wrap the user's provided task with another function that will:

  • Invoke the user function or callable object.
  • Lock the mutex and decrement the counter.

I may not be understanding all the requirements for this thread pool. Thus, for clarity, here is an explicit list as to what I believe are the requirements:

  • The pool manages the lifetime of the threads. The user should not be able to delete threads that reside within the pool.
  • The user can assign a task to the pool in a non-intrusive way.
  • When a task is being assigned, if all threads in the pool are currently running other tasks, then the task is discarded.

Before I provide an implementation, there are a few key points I would like to stress:

  • Once a thread has been launched, it will run until completion, cancellation, or termination. The function the thread is executing cannot be reassigned. To allow for a single thread to execute multiple functions over the course of its life, the thread will want to launch with a function that will read from a queue, such as io_service::run(), and callable types are posted into the event queue, such as from io_service::post().
  • io_service::run() returns if there is no work pending in the io_service, the io_service is stopped, or an exception is thrown from a handler that the thread was running. To prevent io_serivce::run() from returning when there is no unfinished work, the io_service::work class can be used.
  • Defining the task's type requirements (i.e. the task's type must be callable by object() syntax) instead of requiring a type (i.e. task must inherit from process), provides more flexibility to the user. It allows the user to supply a task as a function pointer or a type providing a nullary operator().

Implementation using boost::asio:

#include <boost/asio.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
boost::asio::io_service io_service_;
boost::asio::io_service::work work_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
public:

/// @brief Constructor.
thread_pool( std::size_t pool_size )
: work_( io_service_ ),
available_( pool_size )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &boost::asio::io_service::run,
&io_service_ ) );
}
}

/// @brief Destructor.
~thread_pool()
{
// Force all threads to return from io_service::run().
io_service_.stop();

// Suppress all exceptions.
try
{
threads_.join_all();
}
catch ( const std::exception& ) {}
}

/// @brief Adds a task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );

// If no threads are available, then return.
if ( 0 == available_ ) return;

// Decrement count, indicating thread is no longer available.
--available_;

// Post a wrapped task into the queue.
io_service_.post( boost::bind( &thread_pool::wrap_task, this,
boost::function< void() >( task ) ) );
}

private:
/// @brief Wrap a task so that the available count can be increased once
/// the user provided task has completed.
void wrap_task( boost::function< void() > task )
{
// Run the user supplied task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}

// Task has finished, so increment count of available threads.
boost::unique_lock< boost::mutex > lock( mutex_ );
++available_;
}
};

A few comments about the implementation:

  • Exception handling needs to occur around the user's task. If the user's function or callable object throws an exception that is not of type boost::thread_interrupted, then std::terminate() is called. This is the the result of Boost.Thread's exceptions in thread functions behavior. It is also worth reading Boost.Asio's effect of exceptions thrown from handlers.
  • If the user provides the task via boost::bind, then the nested boost::bind will fail to compile. One of the following options is required:

    • Not support task created by boost::bind.
    • Meta-programming to perform compile-time branching based on whether or not the user's type if the result of boost::bind so that boost::protect could be used, as boost::protect only functions properly on certain function objects.
    • Use another type to pass the task object indirectly. I opted to use boost::function for readability at the cost of losing the exact type. boost::tuple, while slightly less readable, could also be used to preserve the exact type, as seen in the Boost.Asio's serialization example.

Application code can now use the thread_pool type non-intrusively:

void work() {};

struct worker
{
void operator()() {};
};

void more_work( int ) {};

int main()
{
thread_pool pool( 2 );
pool.run_task( work ); // Function pointer.
pool.run_task( worker() ); // Callable object.
pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.
}

The thread_pool could be created without Boost.Asio, and may be slightly easier for maintainers, as they no longer need to know about Boost.Asio behaviors, such as when does io_service::run() return, and what is the io_service::work object:

#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
std::queue< boost::function< void() > > tasks_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
boost::condition_variable condition_;
bool running_;
public:

/// @brief Constructor.
thread_pool( std::size_t pool_size )
: available_( pool_size ),
running_( true )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
}
}

/// @brief Destructor.
~thread_pool()
{
// Set running flag to false then notify all threads.
{
boost::unique_lock< boost::mutex > lock( mutex_ );
running_ = false;
condition_.notify_all();
}

try
{
threads_.join_all();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}

/// @brief Add task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );

// If no threads are available, then return.
if ( 0 == available_ ) return;

// Decrement count, indicating thread is no longer available.
--available_;

// Set task and signal condition variable so that a worker thread will
// wake up andl use the task.
tasks_.push( boost::function< void() >( task ) );
condition_.notify_one();
}

private:
/// @brief Entry point for pool threads.
void pool_main()
{
while( running_ )
{
// Wait on condition variable while the task is empty and the pool is
// still running.
boost::unique_lock< boost::mutex > lock( mutex_ );
while ( tasks_.empty() && running_ )
{
condition_.wait( lock );
}
// If pool is no longer running, break out.
if ( !running_ ) break;

// Copy task locally and remove from the queue. This is done within
// its own scope so that the task object is destructed immediately
// after running the task. This is useful in the event that the
// function contains shared_ptr arguments bound via bind.
{
boost::function< void() > task = tasks_.front();
tasks_.pop();

lock.unlock();

// Run the task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}

// Task has finished, so increment count of available threads.
lock.lock();
++available_;
} // while running_
}
};

C++ thread pool using boost::asio::thread_pool, why can't I reuse my threads?

You join the pool after posting the first task. So, the pool stops before you even accept a second task. That explains why you're not seeing more.

This fixes that:

for (size_t i = 0; i != 50; ++i) {
post(g_pool, boost::bind(f, 10 * i));
}
g_pool.join();

Addendum #1

In response to the comments. In case you want to wait for the outcome of a specific task, consider a future:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <future>

boost::asio::thread_pool g_pool(10);

int f(int i) {
std::cout << '(' + std::to_string(i) + ')';
return i * i;
}

int main() {
std::cout << std::unitbuf;
std::future<int> answer;

for (size_t i = 0; i != 50; ++i) {
auto task = boost::bind(f, 10 * i);
if (i == 42) {
answer = post(g_pool, std::packaged_task<int()>(task));
} else
{
post(g_pool, task);
}
}

answer.wait(); // optionally make sure it is ready before blocking get()
std::cout << "\n[Answer to #42: " + std::to_string(answer.get()) + "]\n";

// wait for remaining tasks
g_pool.join();
}

With one possible output:

(0)(50)(30)(90)(110)(100)(120)(130)(140)(150)(160)(170)(180)(190)(40)(200)(210)(220)(240)(250)(70)(260)(20)(230)(10)(290)(80)(270)(300)(340)(350)(310)(360)(370)(380)(330)(400)(410)(430)(60)(420)(470)(440)(490)(480)(320)(460)(450)(390)
[Answer to #42: 176400]
(280)

Addendum #2: Serializing tasks

If you want to serialize specific tasks, you can use a strand. E.g. to serialize all the request based on the remainder of the parameter modulo 3:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <future>

boost::asio::thread_pool g_pool(10);

int f(int i) {
std::cout << '(' + std::to_string(i) + ')';
return i * i;
}

int main() {
std::cout << std::unitbuf;

std::array strands{make_strand(g_pool.get_executor()),
make_strand(g_pool.get_executor()),
make_strand(g_pool.get_executor())};

for (size_t i = 0; i != 50; ++i) {
post(strands.at(i % 3), boost::bind(f, i));
}

g_pool.join();
}

With a possible output:

(0)(3)(6)(2)(9)(1)(5)(8)(11)(4)(7)(10)(13)(16)(19)(22)(25)(28)(31)(34)(37)(40)(43)(46)(49)(12)(15)(14)(18)(21)(24)(27)(30)(33)(36)(39)(42)(45)(48)(17)(20)(23)(26)(29)(32)(35)(38)(41)(44)(47)

Note that all work is done on any thread, but tasks on a strand happen in the order in which they were posted. So,

  • 0, 3, 6, 9, 12...
  • 1, 4, 7, 10, 13...
  • 2, 5, 8, 11, 14...

happen strictly serially, though

  • 4 and 7 don't need to happen on the same physical thread
  • 11 might happen before 4, because they're not on the same strand

Even More

In case you need more "barrier-like" synchronization, or what's known as fork-join semantics, see Boost asio thread_pool join does not wait for tasks to be finished (where I posted two answers, one after I discovered the fork-join executor example).

boost:asio thread pool implementation for occasionally synchronized tasks

You may use futures for data processing and synchronize with them using boost::wait_for_all(). This will allow you to operate in terms of parts of work done, not threads.

int process_data() {...}

// Pending futures
std::vector<boost::unique_future<int>> pending_data;

for(int i = 0; i < numSmallTasks; ++i)
{
// Create task and corresponding future
// Using shared ptr and binding operator() trick because
// packaged_task is non-copyable, but asio::io_service::post requires argument to be copyable

// Boost 1.51 syntax
// For Boost 1.53+ or C++11 std::packaged_task shall be boost::packaged_task<int()>
typedef boost::packaged_task<int> task_t;

boost::shared_ptr<task_t> task = boost::make_shared<task_t>(
boost::bind(&process_data, i, theTime));

boost::unique_future<int> fut = task->get_future();

pending_data.push_back(std::move(fut));
io_service.post(boost::bind(&task_t::operator(), task));
}

// After loop - wait until all futures are evaluated
boost::wait_for_all(pending_data.begin(), pending_data.end());

How to create a thread pool using boost in C++?

The process is pretty simple. First create an asio::io_service and a thread_group. Fill the thread_group with threads linked to the io_service. Assign tasks to the threads using the boost::bind function.

To stop the threads (usually when you are exiting your program) just stop the io_service and join all threads.

You should only need these headers:

#include <boost/asio/io_service.hpp>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>

here is an example:

/*
* Create an asio::io_service and a thread_group (through pool in essence)
*/
boost::asio::io_service ioService;
boost::thread_group threadpool;

/*
* This will start the ioService processing loop. All tasks
* assigned with ioService.post() will start executing.
*/
boost::asio::io_service::work work(ioService);

/*
* This will add 2 threads to the thread pool. (You could just put it in a for loop)
*/
threadpool.create_thread(
boost::bind(&boost::asio::io_service::run, &ioService)
);
threadpool.create_thread(
boost::bind(&boost::asio::io_service::run, &ioService)
);

/*
* This will assign tasks to the thread pool.
* More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions"
*/
ioService.post(boost::bind(myTask, "Hello World!"));
ioService.post(boost::bind(clearCache, "./cache"));
ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit"));

/*
* This will stop the ioService processing loop. Any tasks
* you add behind this point will not execute.
*/
ioService.stop();

/*
* Will wait till all the threads in the thread pool are finished with
* their assigned tasks and 'join' them. Just assume the threads inside
* the threadpool will be destroyed by this method.
*/
threadpool.join_all();

Source: Recipes < Asio

Using thread pool for simulation : boost-thread and boost-asio

Here are the results of my research !

The distributed simulation is based on a main class DistributedSimulation using two implementation classes: impl::m_io_service and impl::dispatcher.

The boost::asio thread pool is based on attaching io_service::run() method to different threads.

The idea is to redefine this method and to include a mechanism to identify the current thread. The solution below is based on thread local storage boost::thread_specific_ptr of boost::uuid. After reading the comment of Tres, I think that identifying thread using boost::thread::id is a better solution (but equivalent and not too different).

Finally another class is used to dispatch the input data to instances of class Simulation. This class creates several instances of the same class Simulation and use them to compute the results in each thread.

namespace impl {

// Create a derived class of io_service including thread specific data (a unique identifier of the thread)
struct m_io_service : public boost::asio::io_service
{
static boost::thread_specific_ptr<boost::uuids::uuid> ptrSpec_;

std::size_t run()
{
if(ptrSpec_.get() == 0)
ptrSpec_.reset(new boost::uuids::uuid(boost::uuids::random_generator()()) );

return boost::asio::io_service::run();
}
};

// Create a class that dispatches the input data over the N instances of the class Simulation
template <class Simulation>
class dispatcher
{
public:
static const std::size_t N = 6;

typedef Simulation::input_t input_t;
typedef Simulation::output_t output_t;

friend DistributedSimulation;

protected:
std::vector< boost::shared_ptr<Simulation> > simuInst;
std::vector< boost::uuids::uuid > map;

public:

// Constructor, creating the N instances of class Simulation
dispatcher( const Simulation& simuRef)
{
simuInst.resize(N);
for(std::size_t i=0; i<N; ++i)
simuInst[i].reset( simuRef.clone() );
}

// Record the unique identifiers and do the calculation using the right instance of class Simulation
void dispatch( const Simulation::input_t& in )
{
if( map.size() == 0 ) {
map.push_back(*m_io_service::ptrSpec_);
simuInst[0]->eval(in, *m_io_service::ptrSpec_);
}
else {
if( map.size() < N ) {
map.push_back(*m_io_service::ptrSpec_);
simuInst[map.size()-1]->eval(in, *m_io_service::ptrSpec_);
}
else {
for(size_t i=0; i<N;++i) {
if( map[i] == *m_io_service::ptrSpec_) {
simuInst[i]->eval(in, *m_io_service::ptrSpec_);
return;
}
}
}
}
}
};

boost::thread_specific_ptr<boost::uuids::uuid> m_io_service::ptrSpec_;
}

// Main class, create a distributed simulation based on a class Simulation
template <class Simulation>
class DistributedSimulation
{
public:
static const std::size_t N = impl::dispatcher::N;

protected:
impl::dispatcher _disp;

public:
DistributedSimulation() : _disp( Simulation() ) {}

DistributedSimulation(Simulation& simuRef)
: _disp( simuRef ) { }

// Simulation with a large (>>N) number of inputs
void eval( const std::vector< Simulation::input_t >& inputs, std::vector< Simulation::output_t >& outputs )
{

// Clear the results from a previous calculation (and stored in instances of class Simulation)
...

// Creation of the pool using N threads
impl::m_io_service io_service;
boost::asio::io_service::work work(io_service);
boost::thread_group threads;
for (std::size_t i = 0; i < N; ++i)
threads.create_thread(boost::bind(&impl::m_io_service::run, &io_service));

// Adding tasks
for( std::size_t i = 0, i_end = inputs.size(); i<i_end; ++i)
io_service.post( boost::bind(&impl::dispatcher::dispatch, &_disp, inputs[i]) );

// End of the tasks
io_service.stop();
threads.join_all();

// Gather the results iterating through instances of class simulation
...
}
};

Edit

The code below is an update of my previous solution, taking into account the comment of Tres. As I said before, it is much more readable simple !

  template <class Simulation>
class DistributedSimulation
{
public:
typedef typename Simulation::input_t input_t;
typedef typename Simulation::output_t output_t;

typedef boost::shared_ptr<Simulation> SimulationSPtr_t;
typedef boost::thread::id id_t;
typedef std::map< id_t, std::size_t >::iterator IDMapIterator_t;

protected:
unsigned int _NThreads; // Number of threads
std::vector< SimulationSPtr_t > _simuInst; // Instances of class Simulation
std::map< id_t, std::size_t > _IDMap; // Map between thread id and instance index.

private:
boost::mutex _mutex;

public:

DistributedSimulation( ) {}

DistributedSimulation( const Simulation& simuRef, const unsigned int NThreads = boost::thread::hardware_concurrency() )
{ init(simuRef, NThreads); }

DistributedSimulation(const DistributedSimulation& simuDistrib)
{ init(simuRef, NThreads); }

virtual ~DistributedSimulation() {}

void init(const Simulation& simuRef, const unsigned int NThreads = boost::thread::hardware_concurrency())
{
_NThreads = (NThreads == 0) ? 1 : NThreads;
_simuInst.resize(_NThreads);
for(std::size_t i=0; i<_NThreads; ++i)
_simuInst[i].reset( simuRef.clone() );
_IDMap.clear();
}

void dispatch( const input_t& input )
{
// Get current thread id
boost::thread::id id0 = boost::this_thread::get_id();

// Get the right instance
Simulation* sim = NULL;
{
boost::mutex::scoped_lock scoped_lock(_mutex);
IDMapIterator_t it = _IDMap.find(id0);
if( it != _IDMap.end() )
sim = _simuInst[it->second].get();
}

// Simulation
if( NULL != sim )
sim->eval(input);
}

// Distributed evaluation.
void eval( const std::vector< input_t >& inputs, std::vector< output_t >& outputs )
{
//--Initialisation
const std::size_t NInputs = inputs.size();

// Clear the ouptuts f(contained in instances of class Simulation) from a previous run
...

// Create thread pool and save ids
boost::asio::io_service io_service;
boost::asio::io_service::work work(io_service);
boost::thread_group threads;
for (std::size_t i = 0; i < _NThreads; ++i)
{
boost::thread* thread_ptr = threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
_IDMap[ thread_ptr->get_id() ] = i;
}

// Add tasks
for( std::size_t i = 0; i < NInputs; ++i)
io_service.post( boost::bind(&DistributedSimulation::dispatch, this, inputs[i]) );

// Stop the service
io_service.stop();
threads.join_all();

// Gather results (contained in each instances of class Simulation)
...
}
};

Boost asio thread_pool join does not wait for tasks to be finished

I just ran into this advanced executor example which is hidden from the documentation:

I realized just now that Asio comes with a fork_executor example which does exactly this: you can "group" tasks and join the executor (which represents that group) instead of the pool. I've missed this for the longest time since none of the executor examples are listed in the HTML documentation – sehe 21 mins ago

So without further ado, here's that sample applied to your question:

Live On Coliru

#define BOOST_BIND_NO_PLACEHOLDERS
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/ts/executor.hpp>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>

// A fixed-size thread pool used to implement fork/join semantics. Functions
// are scheduled using a simple FIFO queue. Implementing work stealing, or
// using a queue based on atomic operations, are left as tasks for the reader.
class fork_join_pool : public boost::asio::execution_context {
public:
// The constructor starts a thread pool with the specified number of
// threads. Note that the thread_count is not a fixed limit on the pool's
// concurrency. Additional threads may temporarily be added to the pool if
// they join a fork_executor.
explicit fork_join_pool(std::size_t thread_count = std::thread::hardware_concurrency()*2)
: use_count_(1), threads_(thread_count)
{
try {
// Ask each thread in the pool to dequeue and execute functions
// until it is time to shut down, i.e. the use count is zero.
for (thread_count_ = 0; thread_count_ < thread_count; ++thread_count_) {
boost::asio::dispatch(threads_, [&] {
std::unique_lock<std::mutex> lock(mutex_);
while (use_count_ > 0)
if (!execute_next(lock))
condition_.wait(lock);
});
}
} catch (...) {
stop_threads();
threads_.join();
throw;
}
}

// The destructor waits for the pool to finish executing functions.
~fork_join_pool() {
stop_threads();
threads_.join();
}

private:
friend class fork_executor;

// The base for all functions that are queued in the pool.
struct function_base {
std::shared_ptr<std::size_t> work_count_;
void (*execute_)(std::shared_ptr<function_base>& p);
};

// Execute the next function from the queue, if any. Returns true if a
// function was executed, and false if the queue was empty.
bool execute_next(std::unique_lock<std::mutex>& lock) {
if (queue_.empty())
return false;
auto p(queue_.front());
queue_.pop();
lock.unlock();
execute(lock, p);
return true;
}

// Execute a function and decrement the outstanding work.
void execute(std::unique_lock<std::mutex>& lock,
std::shared_ptr<function_base>& p) {
std::shared_ptr<std::size_t> work_count(std::move(p->work_count_));
try {
p->execute_(p);
lock.lock();
do_work_finished(work_count);
} catch (...) {
lock.lock();
do_work_finished(work_count);
throw;
}
}

// Increment outstanding work.
void
do_work_started(const std::shared_ptr<std::size_t>& work_count) noexcept {
if (++(*work_count) == 1)
++use_count_;
}

// Decrement outstanding work. Notify waiting threads if we run out.
void
do_work_finished(const std::shared_ptr<std::size_t>& work_count) noexcept {
if (--(*work_count) == 0) {
--use_count_;
condition_.notify_all();
}
}

// Dispatch a function, executing it immediately if the queue is already
// loaded. Otherwise adds the function to the queue and wakes a thread.
void do_dispatch(std::shared_ptr<function_base> p,
const std::shared_ptr<std::size_t>& work_count) {
std::unique_lock<std::mutex> lock(mutex_);
if (queue_.size() > thread_count_ * 16) {
do_work_started(work_count);
lock.unlock();
execute(lock, p);
} else {
queue_.push(p);
do_work_started(work_count);
condition_.notify_one();
}
}

// Add a function to the queue and wake a thread.
void do_post(std::shared_ptr<function_base> p,
const std::shared_ptr<std::size_t>& work_count) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(p);
do_work_started(work_count);
condition_.notify_one();
}

// Ask all threads to shut down.
void stop_threads() {
std::lock_guard<std::mutex> lock(mutex_);
--use_count_;
condition_.notify_all();
}

std::mutex mutex_;
std::condition_variable condition_;
std::queue<std::shared_ptr<function_base>> queue_;
std::size_t use_count_;
std::size_t thread_count_;
boost::asio::thread_pool threads_;
};

// A class that satisfies the Executor requirements. Every function or piece of
// work associated with a fork_executor is part of a single, joinable group.
class fork_executor {
public:
fork_executor(fork_join_pool& ctx)
: context_(ctx), work_count_(std::make_shared<std::size_t>(0)) {}

fork_join_pool& context() const noexcept { return context_; }

void on_work_started() const noexcept {
std::lock_guard<std::mutex> lock(context_.mutex_);
context_.do_work_started(work_count_);
}

void on_work_finished() const noexcept {
std::lock_guard<std::mutex> lock(context_.mutex_);
context_.do_work_finished(work_count_);
}

}


Related Topics



Leave a reply



Submit