How to create a thread pool using boost in C++?
The process is pretty simple. First create an asio::io_service and a thread_group. Fill the thread_group with threads linked to the io_service. Assign tasks to the threads using the boost::bind function.
To stop the threads (usually when you are exiting your program) just stop the io_service and join all threads.
You should only need these headers:
#include <boost/asio/io_service.hpp>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
here is an example:
/*
* Create an asio::io_service and a thread_group (through pool in essence)
*/
boost::asio::io_service ioService;
boost::thread_group threadpool;
/*
* This will start the ioService processing loop. All tasks
* assigned with ioService.post() will start executing.
*/
boost::asio::io_service::work work(ioService);
/*
* This will add 2 threads to the thread pool. (You could just put it in a for loop)
*/
threadpool.create_thread(
boost::bind(&boost::asio::io_service::run, &ioService)
);
threadpool.create_thread(
boost::bind(&boost::asio::io_service::run, &ioService)
);
/*
* This will assign tasks to the thread pool.
* More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions"
*/
ioService.post(boost::bind(myTask, "Hello World!"));
ioService.post(boost::bind(clearCache, "./cache"));
ioService.post(boost::bind(getSocialUpdates, "twitter,gmail,facebook,tumblr,reddit"));
/*
* This will stop the ioService processing loop. Any tasks
* you add behind this point will not execute.
*/
ioService.stop();
/*
* Will wait till all the threads in the thread pool are finished with
* their assigned tasks and 'join' them. Just assume the threads inside
* the threadpool will be destroyed by this method.
*/
threadpool.join_all();
Source: Recipes < Asio
C++ thread pool using boost::asio::thread_pool, why can't I reuse my threads?
You join the pool after posting the first task. So, the pool stops before you even accept a second task. That explains why you're not seeing more.
This fixes that:
for (size_t i = 0; i != 50; ++i) {
post(g_pool, boost::bind(f, 10 * i));
}
g_pool.join();
Addendum #1
In response to the comments. In case you want to wait for the outcome of a specific task, consider a future:
Live On Coliru
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <future>
boost::asio::thread_pool g_pool(10);
int f(int i) {
std::cout << '(' + std::to_string(i) + ')';
return i * i;
}
int main() {
std::cout << std::unitbuf;
std::future<int> answer;
for (size_t i = 0; i != 50; ++i) {
auto task = boost::bind(f, 10 * i);
if (i == 42) {
answer = post(g_pool, std::packaged_task<int()>(task));
} else
{
post(g_pool, task);
}
}
answer.wait(); // optionally make sure it is ready before blocking get()
std::cout << "\n[Answer to #42: " + std::to_string(answer.get()) + "]\n";
// wait for remaining tasks
g_pool.join();
}
With one possible output:
(0)(50)(30)(90)(110)(100)(120)(130)(140)(150)(160)(170)(180)(190)(40)(200)(210)(220)(240)(250)(70)(260)(20)(230)(10)(290)(80)(270)(300)(340)(350)(310)(360)(370)(380)(330)(400)(410)(430)(60)(420)(470)(440)(490)(480)(320)(460)(450)(390)
[Answer to #42: 176400]
(280)
Addendum #2: Serializing tasks
If you want to serialize specific tasks, you can use a strand. E.g. to serialize all the request based on the remainder of the parameter modulo 3:
Live On Coliru
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <future>
boost::asio::thread_pool g_pool(10);
int f(int i) {
std::cout << '(' + std::to_string(i) + ')';
return i * i;
}
int main() {
std::cout << std::unitbuf;
std::array strands{make_strand(g_pool.get_executor()),
make_strand(g_pool.get_executor()),
make_strand(g_pool.get_executor())};
for (size_t i = 0; i != 50; ++i) {
post(strands.at(i % 3), boost::bind(f, i));
}
g_pool.join();
}
With a possible output:
(0)(3)(6)(2)(9)(1)(5)(8)(11)(4)(7)(10)(13)(16)(19)(22)(25)(28)(31)(34)(37)(40)(43)(46)(49)(12)(15)(14)(18)(21)(24)(27)(30)(33)(36)(39)(42)(45)(48)(17)(20)(23)(26)(29)(32)(35)(38)(41)(44)(47)
Note that all work is done on any thread, but tasks on a strand happen in the order in which they were posted. So,
- 0, 3, 6, 9, 12...
- 1, 4, 7, 10, 13...
- 2, 5, 8, 11, 14...
happen strictly serially, though
- 4 and 7 don't need to happen on the same physical thread
- 11 might happen before 4, because they're not on the same strand
Even More
In case you need more "barrier-like" synchronization, or what's known as fork-join semantics, see Boost asio thread_pool join does not wait for tasks to be finished (where I posted two answers, one after I discovered the fork-join executor example).
Using boost::asio thread pool for general purpose tasks
Boost.Asio is not solely for network programming, see the reference documentation. It has extensive support for things like
- time based operations (
deadline_timer
) - signal handling
- platform specific operations such as posix streams and Windows handles
I've used it for other purposes in several applications as well. One example being a thread pool to service potentially long running blocking database operations while providing an asynchronous interface for the application. Boost.Asio really is a very powerful library. Using it for a general purpose thread pool as you propose can work just fine.
Thread pool using boost asio
In short, you need to wrap the user's provided task with another function that will:
- Invoke the user function or callable object.
- Lock the mutex and decrement the counter.
I may not be understanding all the requirements for this thread pool. Thus, for clarity, here is an explicit list as to what I believe are the requirements:
- The pool manages the lifetime of the threads. The user should not be able to delete threads that reside within the pool.
- The user can assign a task to the pool in a non-intrusive way.
- When a task is being assigned, if all threads in the pool are currently running other tasks, then the task is discarded.
Before I provide an implementation, there are a few key points I would like to stress:
- Once a thread has been launched, it will run until completion, cancellation, or termination. The function the thread is executing cannot be reassigned. To allow for a single thread to execute multiple functions over the course of its life, the thread will want to launch with a function that will read from a queue, such as
io_service::run()
, and callable types are posted into the event queue, such as fromio_service::post()
. io_service::run()
returns if there is no work pending in theio_service
, theio_service
is stopped, or an exception is thrown from a handler that the thread was running. To preventio_serivce::run()
from returning when there is no unfinished work, theio_service::work
class can be used.- Defining the task's type requirements (i.e. the task's type must be callable by
object()
syntax) instead of requiring a type (i.e. task must inherit fromprocess
), provides more flexibility to the user. It allows the user to supply a task as a function pointer or a type providing a nullaryoperator()
.
Implementation using boost::asio
:
#include <boost/asio.hpp>
#include <boost/thread.hpp>
class thread_pool
{
private:
boost::asio::io_service io_service_;
boost::asio::io_service::work work_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
public:
/// @brief Constructor.
thread_pool( std::size_t pool_size )
: work_( io_service_ ),
available_( pool_size )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &boost::asio::io_service::run,
&io_service_ ) );
}
}
/// @brief Destructor.
~thread_pool()
{
// Force all threads to return from io_service::run().
io_service_.stop();
// Suppress all exceptions.
try
{
threads_.join_all();
}
catch ( const std::exception& ) {}
}
/// @brief Adds a task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );
// If no threads are available, then return.
if ( 0 == available_ ) return;
// Decrement count, indicating thread is no longer available.
--available_;
// Post a wrapped task into the queue.
io_service_.post( boost::bind( &thread_pool::wrap_task, this,
boost::function< void() >( task ) ) );
}
private:
/// @brief Wrap a task so that the available count can be increased once
/// the user provided task has completed.
void wrap_task( boost::function< void() > task )
{
// Run the user supplied task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
// Task has finished, so increment count of available threads.
boost::unique_lock< boost::mutex > lock( mutex_ );
++available_;
}
};
A few comments about the implementation:
- Exception handling needs to occur around the user's task. If the user's function or callable object throws an exception that is not of type
boost::thread_interrupted
, thenstd::terminate()
is called. This is the the result of Boost.Thread's exceptions in thread functions behavior. It is also worth reading Boost.Asio's effect of exceptions thrown from handlers. - If the user provides the
task
viaboost::bind
, then the nestedboost::bind
will fail to compile. One of the following options is required:- Not support
task
created byboost::bind
. - Meta-programming to perform compile-time branching based on whether or not the user's type if the result of
boost::bind
so thatboost::protect
could be used, asboost::protect
only functions properly on certain function objects. - Use another type to pass the
task
object indirectly. I opted to useboost::function
for readability at the cost of losing the exact type.boost::tuple
, while slightly less readable, could also be used to preserve the exact type, as seen in the Boost.Asio's serialization example.
- Not support
Application code can now use the thread_pool
type non-intrusively:
void work() {};
struct worker
{
void operator()() {};
};
void more_work( int ) {};
int main()
{
thread_pool pool( 2 );
pool.run_task( work ); // Function pointer.
pool.run_task( worker() ); // Callable object.
pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.
}
The thread_pool
could be created without Boost.Asio, and may be slightly easier for maintainers, as they no longer need to know about Boost.Asio
behaviors, such as when does io_service::run()
return, and what is the io_service::work
object:
#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
class thread_pool
{
private:
std::queue< boost::function< void() > > tasks_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
boost::condition_variable condition_;
bool running_;
public:
/// @brief Constructor.
thread_pool( std::size_t pool_size )
: available_( pool_size ),
running_( true )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
}
}
/// @brief Destructor.
~thread_pool()
{
// Set running flag to false then notify all threads.
{
boost::unique_lock< boost::mutex > lock( mutex_ );
running_ = false;
condition_.notify_all();
}
try
{
threads_.join_all();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}
/// @brief Add task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );
// If no threads are available, then return.
if ( 0 == available_ ) return;
// Decrement count, indicating thread is no longer available.
--available_;
// Set task and signal condition variable so that a worker thread will
// wake up andl use the task.
tasks_.push( boost::function< void() >( task ) );
condition_.notify_one();
}
private:
/// @brief Entry point for pool threads.
void pool_main()
{
while( running_ )
{
// Wait on condition variable while the task is empty and the pool is
// still running.
boost::unique_lock< boost::mutex > lock( mutex_ );
while ( tasks_.empty() && running_ )
{
condition_.wait( lock );
}
// If pool is no longer running, break out.
if ( !running_ ) break;
// Copy task locally and remove from the queue. This is done within
// its own scope so that the task object is destructed immediately
// after running the task. This is useful in the event that the
// function contains shared_ptr arguments bound via bind.
{
boost::function< void() > task = tasks_.front();
tasks_.pop();
lock.unlock();
// Run the task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}
// Task has finished, so increment count of available threads.
lock.lock();
++available_;
} // while running_
}
};
Creating a thread pool using boost
There is an unofficial (yet) threadpool in boost.
But it's not a problem to implement one yourself especially if great genericity is not a primary goal. Idea: your threadpool can be parametrized with TaskType type and the number of workers. The TP must be given the handler function which takes TaskType. TP contains a queue of added tasks. The real thread function just takes a task from the queue and calls the passed handler. Something like that.
Use Futures with Boost Thread Pool
The operations posted to the pool end without the threads ending. That's the whole purpose of pooling the threads.
void send_file(std::string const& file_path){
post(pool_, [this, &file_path] {
handle_send_file(file_path);
});
// DO SOMETHING WHEN handle_send_file ENDS
}
This has several issues. The largest one is that you should not capture file_path
by reference, as the argument is soon out of scope, and the handle_send_file
call will run at an unspecified time in another thread. That's a race condition and dangling reference. Undefined Behaviour results.
Then the
// DO SOMETHING WHEN handle_send_file ENDS
is on a line which has no sequence relation with handle_send_file
. In fact, it will probably run before that operation ever has a chance to start.
Simplifying
Here's a simplified version:
#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <fstream>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;
static asio::thread_pool pool_;
struct X {
std::unique_ptr<tcp::socket> socket_;
explicit X(unsigned short port) : socket_(new tcp::socket{ pool_ }) {
socket_->connect({ {}, port });
}
asio::thread_pool pool_;
std::unique_ptr<tcp::socket> socket_{ new tcp::socket{ pool_ } };
void send_file(std::string file_path) {
post(pool_, [=, this] {
send_file_implementation(file_path);
// DO SOMETHING WHEN send_file_implementation ENDS
});
}
// throws system_error exception
void send_file_implementation(std::string file_path) {
std::ifstream source_file(file_path,
std::ios_base::binary | std::ios_base::ate);
size_t file_size = source_file.tellg();
source_file.seekg(0);
write(*socket_,
asio::buffer(file_path + "\n" + std::to_string(file_size) + "\n\n"));
boost::array<char, 1024> buf{};
while (source_file.read(buf.c_array(), buf.size()) ||
source_file.gcount() > 0)
{
int n = source_file.gcount();
if (n <= 0) {
using namespace boost::system;
throw system_error(errc::io_error, system_category());
}
write(*socket_, asio::buffer(buf), asio::transfer_exactly(n));
}
}
};
Now, you can indeed run several of these operations in parallel (assuming several instances of X
, so you have separate socket_
connections).
To do something at the end, just put code where I moved the comment:
// DO SOMETHING WHEN send_file_implementation ENDS
If you don't know what to do there and you wish to make a future ready at that point, you can:
std::future<void> send_file(std::string file_path) {
std::packaged_task<void()> task([=, this] {
send_file_implementation(file_path);
});
return post(pool_, std::move(task));
}
This overload of post
magically¹ returns the future from the packaged task. That packaged task will set the internal promise with either the (void
) return value or the exception thrown.
See it in action: Live On Coliru
int main() {
// send two files simultaneously to different connections
X clientA(6868);
X clientB(6969);
std::future<void> futures[] = {
clientA.send_file("main.cpp"),
clientB.send_file("main.cpp"),
};
for (auto& fut : futures) try {
fut.get();
std::cout << "Everything completed without error\n";
} catch(std::exception const& e) {
std::cout << "Error occurred: " << e.what() << "\n";
};
pool_.join();
}
I tested this while running two netcats to listen on 6868/6969:
nc -l -p 6868 | head& nc -l -p 6969 | md5sum&
./a.out
wait
The server prints:
Everything completed without error
Everything completed without error
The netcats print their filtered output:
main.cpp
1907
#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <fstream>
#include <iostream>
#include <future>
namespace asio = boost::asio;
using asio::ip::tcp;
7ecb71992bcbc22bda44d78ad3e2a5ef -
¹ not magic: see https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/async_result.html
boost::threadpool::pool vs.boost::thread_group
boost::thread_group
is a convenience class for performing thread management operations on a collection of threads. For example, instead of having to iterate over std::vector<boost::thread>
, invoking join()
on each thread, the thread_group
provides a convenient join_all()
member function.
With boost::thread
, regardless of it being managed by boost::thread_group
, the lifetime of the thread is often dependent on the work in which the thread is doing. For example, if a thread is created to perform a computationally expensive calculation, then the thread can exit once the result has been calculated. If the work is short-lived, then the overhead of creating and destroying threads can affect performance.
On the other hand, a threadpool is a pattern, where a number of threads services a number of task/work. The lifetime of the thread is not directly associated with the lifetime of the task. To continue with the previous example, the application would schedule the computationally expensive calculation to run within the thread pool. The work will be queued within the threadpool, and one of the threadpool's threads will be selected to perform the work. Once the calculation has completed, the thread goes back to waiting for more work to be scheduled with the threadpool.
As shown in this threadpool example, a threadpool can be implemented with boost::thread_group
to manage lifetime of threads, and boost::asio::io_service
for task/work dispatching.
how to make a threadpool with boost::thread
A thread pool is just a bunch of threads that already running, and that are all running the same function. This functions basically just waits on a queue, and when there is a "function" in the queue it extracts and executes it.
Pseudo-code:
void thread_pool_function()
{
while (true)
{
wait_for_signal_that_queue_is_not_empty();
function_to_call = queue.remove_top();
unklock_queue_semaphore();
function_to_call();
}
}
create_thread(thread_pool_function);
create_thread(thread_pool_function);
create_thread(thread_pool_function);
create_thread(thread_pool_function);
In the "code" above there are now four threads, all initially waiting for something to be put in a "queue". When there is something in the queue, it extracts it, and calls it as a function.
This is probably the simplest way to implement a thread pool.
Related Topics
Making a Borderless Window with for Qt
Gui Toolkits, Which Should I Use
Does Std::Mutex Create a Fence
Cost of Throwing C++0X Exceptions
Standard Way to Build a Chrome Extension into Chromium
Static Member Initialization for Specialized Template Class
How Similar Are Boost.Filesystem and the C++ Standard Filesystem Library
Visual Studio 2010 & 2008 Can't Handle Source Files with Identical Names in Different Folders
How to Increment an Iterator by Just Adding a Number
How Does the Linker Handle Identical Template Instantiations Across Translation Units
How to Extend a Lexical Cast to Support Enumerated Types
What Is the Ndebug Preprocessor MACro Used for (On Different Platforms)
What Does This C Code Do [Duff's Device]
C++ - How to Find the Length of an Integer
C++: Fastest Method to Check If All Array Elements Are Equal