Creating a Thread Pool Using Boost

How to create a thread pool using boost in C++?

The process is pretty simple. First create an asio::io_service and a thread_group. Fill the thread_group with threads linked to the io_service. Assign tasks to the threads using the boost::bind function.

To stop the threads (usually when you are exiting your program) just stop the io_service and join all threads.

You should only need these headers:

#include <boost/asio/io_service.hpp>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>

here is an example:

/*
* Create an asio::io_service and a thread_group (through pool in essence)
*/
boost::asio::io_service ioService;
boost::thread_group threadpool;

/*
* This will start the ioService processing loop. All tasks
* assigned with ioService.post() will start executing.
*/
boost::asio::io_service::work work(ioService);

/*
* This will add 2 threads to the thread pool. (You could just put it in a for loop)
*/
threadpool.create_thread(
boost::bind(&boost::asio::io_service::run, &ioService)
);
threadpool.create_thread(
boost::bind(&boost::asio::io_service::run, &ioService)
);

/*
* This will assign tasks to the thread pool.
* More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions"
*/
ioService.post(boost::bind(myTask, "Hello World!"));
ioService.post(boost::bind(clearCache, "./cache"));
ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit"));

/*
* This will stop the ioService processing loop. Any tasks
* you add behind this point will not execute.
*/
ioService.stop();

/*
* Will wait till all the threads in the thread pool are finished with
* their assigned tasks and 'join' them. Just assume the threads inside
* the threadpool will be destroyed by this method.
*/
threadpool.join_all();

Source: Recipes < Asio

C++ thread pool using boost::asio::thread_pool, why can't I reuse my threads?

You join the pool after posting the first task. So, the pool stops before you even accept a second task. That explains why you're not seeing more.

This fixes that:

for (size_t i = 0; i != 50; ++i) {
post(g_pool, boost::bind(f, 10 * i));
}
g_pool.join();

Addendum #1

In response to the comments. In case you want to wait for the outcome of a specific task, consider a future:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <future>

boost::asio::thread_pool g_pool(10);

int f(int i) {
std::cout << '(' + std::to_string(i) + ')';
return i * i;
}

int main() {
std::cout << std::unitbuf;
std::future<int> answer;

for (size_t i = 0; i != 50; ++i) {
auto task = boost::bind(f, 10 * i);
if (i == 42) {
answer = post(g_pool, std::packaged_task<int()>(task));
} else
{
post(g_pool, task);
}
}

answer.wait(); // optionally make sure it is ready before blocking get()
std::cout << "\n[Answer to #42: " + std::to_string(answer.get()) + "]\n";

// wait for remaining tasks
g_pool.join();
}

With one possible output:

(0)(50)(30)(90)(110)(100)(120)(130)(140)(150)(160)(170)(180)(190)(40)(200)(210)(220)(240)(250)(70)(260)(20)(230)(10)(290)(80)(270)(300)(340)(350)(310)(360)(370)(380)(330)(400)(410)(430)(60)(420)(470)(440)(490)(480)(320)(460)(450)(390)
[Answer to #42: 176400]
(280)

Addendum #2: Serializing tasks

If you want to serialize specific tasks, you can use a strand. E.g. to serialize all the request based on the remainder of the parameter modulo 3:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <future>

boost::asio::thread_pool g_pool(10);

int f(int i) {
std::cout << '(' + std::to_string(i) + ')';
return i * i;
}

int main() {
std::cout << std::unitbuf;

std::array strands{make_strand(g_pool.get_executor()),
make_strand(g_pool.get_executor()),
make_strand(g_pool.get_executor())};

for (size_t i = 0; i != 50; ++i) {
post(strands.at(i % 3), boost::bind(f, i));
}

g_pool.join();
}

With a possible output:

(0)(3)(6)(2)(9)(1)(5)(8)(11)(4)(7)(10)(13)(16)(19)(22)(25)(28)(31)(34)(37)(40)(43)(46)(49)(12)(15)(14)(18)(21)(24)(27)(30)(33)(36)(39)(42)(45)(48)(17)(20)(23)(26)(29)(32)(35)(38)(41)(44)(47)

Note that all work is done on any thread, but tasks on a strand happen in the order in which they were posted. So,

  • 0, 3, 6, 9, 12...
  • 1, 4, 7, 10, 13...
  • 2, 5, 8, 11, 14...

happen strictly serially, though

  • 4 and 7 don't need to happen on the same physical thread
  • 11 might happen before 4, because they're not on the same strand

Even More

In case you need more "barrier-like" synchronization, or what's known as fork-join semantics, see Boost asio thread_pool join does not wait for tasks to be finished (where I posted two answers, one after I discovered the fork-join executor example).

Using boost::asio thread pool for general purpose tasks

Boost.Asio is not solely for network programming, see the reference documentation. It has extensive support for things like

  • time based operations (deadline_timer)
  • signal handling
  • platform specific operations such as posix streams and Windows handles

I've used it for other purposes in several applications as well. One example being a thread pool to service potentially long running blocking database operations while providing an asynchronous interface for the application. Boost.Asio really is a very powerful library. Using it for a general purpose thread pool as you propose can work just fine.

Thread pool using boost asio

In short, you need to wrap the user's provided task with another function that will:

  • Invoke the user function or callable object.
  • Lock the mutex and decrement the counter.

I may not be understanding all the requirements for this thread pool. Thus, for clarity, here is an explicit list as to what I believe are the requirements:

  • The pool manages the lifetime of the threads. The user should not be able to delete threads that reside within the pool.
  • The user can assign a task to the pool in a non-intrusive way.
  • When a task is being assigned, if all threads in the pool are currently running other tasks, then the task is discarded.

Before I provide an implementation, there are a few key points I would like to stress:

  • Once a thread has been launched, it will run until completion, cancellation, or termination. The function the thread is executing cannot be reassigned. To allow for a single thread to execute multiple functions over the course of its life, the thread will want to launch with a function that will read from a queue, such as io_service::run(), and callable types are posted into the event queue, such as from io_service::post().
  • io_service::run() returns if there is no work pending in the io_service, the io_service is stopped, or an exception is thrown from a handler that the thread was running. To prevent io_serivce::run() from returning when there is no unfinished work, the io_service::work class can be used.
  • Defining the task's type requirements (i.e. the task's type must be callable by object() syntax) instead of requiring a type (i.e. task must inherit from process), provides more flexibility to the user. It allows the user to supply a task as a function pointer or a type providing a nullary operator().

Implementation using boost::asio:

#include <boost/asio.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
boost::asio::io_service io_service_;
boost::asio::io_service::work work_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
public:

/// @brief Constructor.
thread_pool( std::size_t pool_size )
: work_( io_service_ ),
available_( pool_size )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &boost::asio::io_service::run,
&io_service_ ) );
}
}

/// @brief Destructor.
~thread_pool()
{
// Force all threads to return from io_service::run().
io_service_.stop();

// Suppress all exceptions.
try
{
threads_.join_all();
}
catch ( const std::exception& ) {}
}

/// @brief Adds a task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );

// If no threads are available, then return.
if ( 0 == available_ ) return;

// Decrement count, indicating thread is no longer available.
--available_;

// Post a wrapped task into the queue.
io_service_.post( boost::bind( &thread_pool::wrap_task, this,
boost::function< void() >( task ) ) );
}

private:
/// @brief Wrap a task so that the available count can be increased once
/// the user provided task has completed.
void wrap_task( boost::function< void() > task )
{
// Run the user supplied task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}

// Task has finished, so increment count of available threads.
boost::unique_lock< boost::mutex > lock( mutex_ );
++available_;
}
};

A few comments about the implementation:

  • Exception handling needs to occur around the user's task. If the user's function or callable object throws an exception that is not of type boost::thread_interrupted, then std::terminate() is called. This is the the result of Boost.Thread's exceptions in thread functions behavior. It is also worth reading Boost.Asio's effect of exceptions thrown from handlers.
  • If the user provides the task via boost::bind, then the nested boost::bind will fail to compile. One of the following options is required:

    • Not support task created by boost::bind.
    • Meta-programming to perform compile-time branching based on whether or not the user's type if the result of boost::bind so that boost::protect could be used, as boost::protect only functions properly on certain function objects.
    • Use another type to pass the task object indirectly. I opted to use boost::function for readability at the cost of losing the exact type. boost::tuple, while slightly less readable, could also be used to preserve the exact type, as seen in the Boost.Asio's serialization example.

Application code can now use the thread_pool type non-intrusively:

void work() {};

struct worker
{
void operator()() {};
};

void more_work( int ) {};

int main()
{
thread_pool pool( 2 );
pool.run_task( work ); // Function pointer.
pool.run_task( worker() ); // Callable object.
pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.
}

The thread_pool could be created without Boost.Asio, and may be slightly easier for maintainers, as they no longer need to know about Boost.Asio behaviors, such as when does io_service::run() return, and what is the io_service::work object:

#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
std::queue< boost::function< void() > > tasks_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
boost::condition_variable condition_;
bool running_;
public:

/// @brief Constructor.
thread_pool( std::size_t pool_size )
: available_( pool_size ),
running_( true )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
}
}

/// @brief Destructor.
~thread_pool()
{
// Set running flag to false then notify all threads.
{
boost::unique_lock< boost::mutex > lock( mutex_ );
running_ = false;
condition_.notify_all();
}

try
{
threads_.join_all();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}

/// @brief Add task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );

// If no threads are available, then return.
if ( 0 == available_ ) return;

// Decrement count, indicating thread is no longer available.
--available_;

// Set task and signal condition variable so that a worker thread will
// wake up andl use the task.
tasks_.push( boost::function< void() >( task ) );
condition_.notify_one();
}

private:
/// @brief Entry point for pool threads.
void pool_main()
{
while( running_ )
{
// Wait on condition variable while the task is empty and the pool is
// still running.
boost::unique_lock< boost::mutex > lock( mutex_ );
while ( tasks_.empty() && running_ )
{
condition_.wait( lock );
}
// If pool is no longer running, break out.
if ( !running_ ) break;

// Copy task locally and remove from the queue. This is done within
// its own scope so that the task object is destructed immediately
// after running the task. This is useful in the event that the
// function contains shared_ptr arguments bound via bind.
{
boost::function< void() > task = tasks_.front();
tasks_.pop();

lock.unlock();

// Run the task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}

// Task has finished, so increment count of available threads.
lock.lock();
++available_;
} // while running_
}
};

Creating a thread pool using boost

There is an unofficial (yet) threadpool in boost.
But it's not a problem to implement one yourself especially if great genericity is not a primary goal. Idea: your threadpool can be parametrized with TaskType type and the number of workers. The TP must be given the handler function which takes TaskType. TP contains a queue of added tasks. The real thread function just takes a task from the queue and calls the passed handler. Something like that.

Use Futures with Boost Thread Pool

The operations posted to the pool end without the threads ending. That's the whole purpose of pooling the threads.

void send_file(std::string const& file_path){
post(pool_, [this, &file_path] {
handle_send_file(file_path);
});
// DO SOMETHING WHEN handle_send_file ENDS
}

This has several issues. The largest one is that you should not capture file_path by reference, as the argument is soon out of scope, and the handle_send_file call will run at an unspecified time in another thread. That's a race condition and dangling reference. Undefined Behaviour results.

Then the

    // DO SOMETHING WHEN handle_send_file ENDS

is on a line which has no sequence relation with handle_send_file. In fact, it will probably run before that operation ever has a chance to start.

Simplifying

Here's a simplified version:

#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <fstream>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;

static asio::thread_pool pool_;

struct X {
std::unique_ptr<tcp::socket> socket_;

explicit X(unsigned short port) : socket_(new tcp::socket{ pool_ }) {
socket_->connect({ {}, port });
}

asio::thread_pool pool_;
std::unique_ptr<tcp::socket> socket_{ new tcp::socket{ pool_ } };

void send_file(std::string file_path) {
post(pool_, [=, this] {
send_file_implementation(file_path);
// DO SOMETHING WHEN send_file_implementation ENDS
});
}

// throws system_error exception
void send_file_implementation(std::string file_path) {
std::ifstream source_file(file_path,
std::ios_base::binary | std::ios_base::ate);
size_t file_size = source_file.tellg();
source_file.seekg(0);

write(*socket_,
asio::buffer(file_path + "\n" + std::to_string(file_size) + "\n\n"));

boost::array<char, 1024> buf{};
while (source_file.read(buf.c_array(), buf.size()) ||
source_file.gcount() > 0)
{
int n = source_file.gcount();

if (n <= 0) {
using namespace boost::system;
throw system_error(errc::io_error, system_category());
}

write(*socket_, asio::buffer(buf), asio::transfer_exactly(n));
}
}
};

Now, you can indeed run several of these operations in parallel (assuming several instances of X, so you have separate socket_ connections).

To do something at the end, just put code where I moved the comment:

// DO SOMETHING WHEN send_file_implementation ENDS

If you don't know what to do there and you wish to make a future ready at that point, you can:

std::future<void> send_file(std::string file_path) {
std::packaged_task<void()> task([=, this] {
send_file_implementation(file_path);
});

return post(pool_, std::move(task));
}

This overload of post magically¹ returns the future from the packaged task. That packaged task will set the internal promise with either the (void) return value or the exception thrown.

See it in action: Live On Coliru

int main() {
// send two files simultaneously to different connections
X clientA(6868);
X clientB(6969);

std::future<void> futures[] = {
clientA.send_file("main.cpp"),
clientB.send_file("main.cpp"),
};

for (auto& fut : futures) try {
fut.get();
std::cout << "Everything completed without error\n";
} catch(std::exception const& e) {
std::cout << "Error occurred: " << e.what() << "\n";
};

pool_.join();
}

I tested this while running two netcats to listen on 6868/6969:

nc -l -p 6868 | head& nc -l -p 6969 | md5sum&
./a.out
wait

The server prints:

Everything completed without error
Everything completed without error

The netcats print their filtered output:

main.cpp
1907

#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <fstream>
#include <iostream>
#include <future>
namespace asio = boost::asio;
using asio::ip::tcp;
7ecb71992bcbc22bda44d78ad3e2a5ef -

¹ not magic: see https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/async_result.html

boost::threadpool::pool vs.boost::thread_group

boost::thread_group is a convenience class for performing thread management operations on a collection of threads. For example, instead of having to iterate over std::vector<boost::thread>, invoking join() on each thread, the thread_group provides a convenient join_all() member function.

With boost::thread, regardless of it being managed by boost::thread_group, the lifetime of the thread is often dependent on the work in which the thread is doing. For example, if a thread is created to perform a computationally expensive calculation, then the thread can exit once the result has been calculated. If the work is short-lived, then the overhead of creating and destroying threads can affect performance.

On the other hand, a threadpool is a pattern, where a number of threads services a number of task/work. The lifetime of the thread is not directly associated with the lifetime of the task. To continue with the previous example, the application would schedule the computationally expensive calculation to run within the thread pool. The work will be queued within the threadpool, and one of the threadpool's threads will be selected to perform the work. Once the calculation has completed, the thread goes back to waiting for more work to be scheduled with the threadpool.

As shown in this threadpool example, a threadpool can be implemented with boost::thread_group to manage lifetime of threads, and boost::asio::io_service for task/work dispatching.

how to make a threadpool with boost::thread

A thread pool is just a bunch of threads that already running, and that are all running the same function. This functions basically just waits on a queue, and when there is a "function" in the queue it extracts and executes it.

Pseudo-code:

void thread_pool_function()
{
while (true)
{
wait_for_signal_that_queue_is_not_empty();

function_to_call = queue.remove_top();

unklock_queue_semaphore();

function_to_call();
}
}

create_thread(thread_pool_function);
create_thread(thread_pool_function);
create_thread(thread_pool_function);
create_thread(thread_pool_function);

In the "code" above there are now four threads, all initially waiting for something to be put in a "queue". When there is something in the queue, it extracts it, and calls it as a function.

This is probably the simplest way to implement a thread pool.



Related Topics



Leave a reply



Submit