boost::asio with boost::unique_future
Boost.Asio only provides first-class support for asynchronous operations to return a C++11 std::future
or an actual value in stackful coroutines. Nevertheless, the requirements on asynchronous operations documents how to customize the return type for other types, such as Boost.Thread's boost::unique_future
. It requires:
- A specialization of the
handler_type
template. This template is used to determine the actual handler to use based on the asynchronous operation's signature. - A specialization of the
async_result
template. This template is used both to determine the return type and to extract the return value from the handler.
Below is a minimal complete example demonstrating deadline_timer::async_wait()
returning boost:unique_future
with a basic calculation being performed over a series of continuations composed with .then()
. To keep the example simple, I have opted to only specialize handler_type
for the asynchronous operation signatures used in the example. For a complete reference, I highly suggest reviewing use_future.hpp
and impl/use_future.hpp
.
#include <exception> // current_exception, make_exception_ptr
#include <memory> // make_shared, shared_ptr
#include <thread> // thread
#include <utility> // move
#define BOOST_RESULT_OF_USE_DECLTYPE
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/future.hpp>
/// @brief Class used to indicate an asynchronous operation should return
/// a boost::unique_future.
class use_unique_future_t {};
/// @brief A special value, similiar to std::nothrow.
constexpr use_unique_future_t use_unique_future;
namespace detail {
/// @brief Completion handler to adapt a boost::promise as a completion
/// handler.
template <typename T>
class unique_promise_handler;
/// @brief Completion handler to adapt a void boost::promise as a completion
/// handler.
template <>
class unique_promise_handler<void>
{
public:
/// @brief Construct from use_unique_future special value.
explicit unique_promise_handler(use_unique_future_t)
: promise_(std::make_shared<boost::promise<void> >())
{}
void operator()(const boost::system::error_code& error)
{
// On error, convert the error code into an exception and set it on
// the promise.
if (error)
promise_->set_exception(
std::make_exception_ptr(boost::system::system_error(error)));
// Otherwise, set the value.
else
promise_->set_value();
}
//private:
std::shared_ptr<boost::promise<void> > promise_;
};
// Ensure any exceptions thrown from the handler are propagated back to the
// caller via the future.
template <typename Function, typename T>
void asio_handler_invoke(
Function function,
unique_promise_handler<T>* handler)
{
// Guarantee the promise lives for the duration of the function call.
std::shared_ptr<boost::promise<T> > promise(handler->promise_);
try
{
function();
}
catch (...)
{
promise->set_exception(std::current_exception());
}
}
} // namespace detail
namespace boost {
namespace asio {
/// @brief Handler type specialization for use_unique_future.
template <typename ReturnType>
struct handler_type<
use_unique_future_t,
ReturnType(boost::system::error_code)>
{
typedef ::detail::unique_promise_handler<void> type;
};
/// @brief Handler traits specialization for unique_promise_handler.
template <typename T>
class async_result< ::detail::unique_promise_handler<T> >
{
public:
// The initiating function will return a boost::unique_future.
typedef boost::unique_future<T> type;
// Constructor creates a new promise for the async operation, and obtains the
// corresponding future.
explicit async_result(::detail::unique_promise_handler<T>& handler)
{
value_ = handler.promise_->get_future();
}
// Obtain the future to be returned from the initiating function.
type get() { return std::move(value_); }
private:
type value_;
};
} // namespace asio
} // namespace boost
int main()
{
boost::asio::io_service io_service;
boost::asio::io_service::work work(io_service);
// Run io_service in its own thread to demonstrate future usage.
std::thread thread([&io_service](){ io_service.run(); });
// Arm 3 second timer.
boost::asio::deadline_timer timer(
io_service, boost::posix_time::seconds(3));
// Asynchronously wait on the timer, then perform basic calculations
// within the future's continuations.
boost::unique_future<int> result =
timer.async_wait(use_unique_future)
.then([](boost::unique_future<void> future){
std::cout << "calculation 1" << std::endl;
return 21;
})
.then([](boost::unique_future<int> future){
std::cout << "calculation 2" << std::endl;
return 2 * future.get();
})
;
std::cout << "Waiting for result" << std::endl;
// Wait for the timer to trigger and for its continuations to calculate
// the result.
std::cout << result.get() << std::endl;
// Cleanup.
io_service.stop();
thread.join();
}
Output:
Waiting for result
calculation 1
calculation 2
42
boost::asio and Active Object
Boost.Asio can be used to encompass the intention of Active Object: decouple method execution from method invocation. Additional requirements will need to be handled at a higher-level, but it is not overly complex when using Boost.Asio in conjunction with other Boost libraries.
Scheduler
could use:
boost::thread
for thread abstraction.boost::thread_group
to manage lifetime of threads.boost::asio::io_service
to provide a threadpool. Will likely want to useboost::asio::io_service::work
to keep threads alive when no work is pending.
ActivationList
could be implemented as:
- A Boost.MultiIndex for obtaining highest priority method request. With a hinted-position
insert()
, the insertion order is preserved for request with the same priority. std::multiset
orstd::multimap
can be used. However, it is unspecified in C++03 as to the order of request with the same key (priority).- If
Request
do not need an guard method, thenstd::priority_queue
could be used.
Request
could be an unspecified type:
boost::function
andboost::bind
could be used to provide a type-erasure, while binding to callable types without introducing aRequest
hierarchy.
Futures
could use Boost.Thread's Futures support.
future.valid()
will return true ifRequest
has been added toActivationList
.future.wait()
will block waiting for a result to become available.future.get()
will block waiting for the result.- If caller does nothing with the
future
, then caller will not be blocked. - Another benefit to using Boost.Thread's Futures is that exceptions originating from within a
Request
will be passed to theFuture
.
Here is a complete example leveraging various Boost libraries and should meet the requirements:
// Standard includes
#include <algorithm> // std::find_if
#include <iostream>
#include <string>
// 3rd party includes
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/make_shared.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/utility/result_of.hpp>
/// @brief scheduler that provides limits with prioritized jobs.
template <typename Priority,
typename Compare = std::less<Priority> >
class scheduler
{
public:
typedef Priority priority_type;
private:
/// @brief method_request is used to couple the guard and call
/// functions for a given method.
struct method_request
{
typedef boost::function<bool()> ready_func_type;
typedef boost::function<void()> run_func_type;
template <typename ReadyFunctor,
typename RunFunctor>
method_request(ReadyFunctor ready,
RunFunctor run)
: ready(ready),
run(run)
{}
ready_func_type ready;
run_func_type run;
};
/// @brief Pair type used to associate a request with its priority.
typedef std::pair<priority_type,
boost::shared_ptr<method_request> > pair_type;
static bool is_method_ready(const pair_type& pair)
{
return pair.second->ready();
}
public:
/// @brief Construct scheduler.
///
/// @param max_threads Maximum amount of concurrent task.
/// @param max_request Maximum amount of request.
scheduler(std::size_t max_threads,
std::size_t max_request)
: work_(io_service_),
max_request_(max_request),
request_count_(0)
{
// Spawn threads, dedicating them to the io_service.
for (std::size_t i = 0; i < max_threads; ++i)
threads_.create_thread(
boost::bind(&boost::asio::io_service::run, &io_service_));
}
/// @brief Destructor.
~scheduler()
{
// Release threads from the io_service.
io_service_.stop();
// Cleanup.
threads_.join_all();
}
/// @brief Insert a method request into the scheduler.
///
/// @param priority Priority of job.
/// @param ready_func Invoked to check if method is ready to run.
/// @param run_func Invoked when ready to run.
///
/// @return future associated with the method.
template <typename ReadyFunctor,
typename RunFunctor>
boost::unique_future<typename boost::result_of<RunFunctor()>::type>
insert(priority_type priority,
const ReadyFunctor& ready_func,
const RunFunctor& run_func)
{
typedef typename boost::result_of<RunFunctor()>::type result_type;
typedef boost::unique_future<result_type> future_type;
boost::unique_lock<mutex_type> lock(mutex_);
// If max request has been reached, then return an invalid future.
if (max_request_ &&
(request_count_ == max_request_))
return future_type();
++request_count_;
// Use a packaged task to handle populating promise and future.
typedef boost::packaged_task<result_type> task_type;
// Bind does not work with rvalue, and packaged_task is only moveable,
// so allocate a shared pointer.
boost::shared_ptr<task_type> task =
boost::make_shared<task_type>(run_func);
// Create method request.
boost::shared_ptr<method_request> request =
boost::make_shared<method_request>(
ready_func,
boost::bind(&task_type::operator(), task));
// Insert into priority. Hint to inserting as close to the end as
// possible to preserve insertion order for request with same priority.
activation_list_.insert(activation_list_.end(),
pair_type(priority, request));
// There is now an outstanding request, so post to dispatch.
io_service_.post(boost::bind(&scheduler::dispatch, this));
return task->get_future();
}
/// @brief Insert a method request into the scheduler.
///
/// @param ready_func Invoked to check if method is ready to run.
/// @param run_func Invoked when ready to run.
///
/// @return future associated with the method.
template <typename ReadyFunctor,
typename RunFunctor>
boost::unique_future<typename boost::result_of<RunFunctor()>::type>
insert(const ReadyFunctor& ready_func,
const RunFunctor& run_func)
{
return insert(priority_type(), ready_func, run_func);
}
/// @brief Insert a method request into the scheduler.
///
/// @param priority Priority of job.
/// @param run_func Invoked when ready to run.
///
/// @return future associated with the method.
template <typename RunFunctor>
boost::unique_future<typename boost::result_of<RunFunctor()>::type>
insert(priority_type priority,
const RunFunctor& run_func)
{
return insert(priority, &always_ready, run_func);
}
/// @brief Insert a method request with default priority into the
/// scheduler.
///
/// @param run_func Invoked when ready to run.
///
/// @param functor Job to run.
///
/// @return future associated with the job.
template <typename RunFunc>
boost::unique_future<typename boost::result_of<RunFunc()>::type>
insert(const RunFunc& run_func)
{
return insert(&always_ready, run_func);
}
/// @brief Cancel all outstanding request.
void cancel()
{
boost::unique_lock<mutex_type> lock(mutex_);
activation_list_.clear();
request_count_ = 0;
}
private:
/// @brief Dispatch a request.
void dispatch()
{
// Get the current highest priority request ready to run from the queue.
boost::unique_lock<mutex_type> lock(mutex_);
if (activation_list_.empty()) return;
// Find the highest priority method ready to run.
typedef typename activation_list_type::iterator iterator;
iterator end = activation_list_.end();
iterator result = std::find_if(
activation_list_.begin(), end, &is_method_ready);
// If no methods are ready, then post into dispatch, as the
// method may have become ready.
if (end == result)
{
io_service_.post(boost::bind(&scheduler::dispatch, this));
return;
}
// Take ownership of request.
boost::shared_ptr<method_request> method = result->second;
activation_list_.erase(result);
// Run method without mutex.
lock.unlock();
method->run();
lock.lock();
// Perform bookkeeping.
--request_count_;
}
static bool always_ready() { return true; }
private:
/// @brief List of outstanding request.
typedef boost::multi_index_container<
pair_type,
boost::multi_index::indexed_by<
boost::multi_index::ordered_non_unique<
boost::multi_index::member<pair_type,
typename pair_type::first_type,
&pair_type::first>,
Compare
>
>
> activation_list_type;
activation_list_type activation_list_;
/// @brief Thread group managing threads servicing pool.
boost::thread_group threads_;
/// @brief io_service used to function as a thread pool.
boost::asio::io_service io_service_;
/// @brief Work is used to keep threads servicing io_service.
boost::asio::io_service::work work_;
/// @brief Maximum amount of request.
const std::size_t max_request_;
/// @brief Count of outstanding request.
std::size_t request_count_;
/// @brief Synchronize access to the activation list.
typedef boost::mutex mutex_type;
mutex_type mutex_;
};
typedef scheduler<unsigned int,
std::greater<unsigned int> > high_priority_scheduler;
/// @brief adder is a simple proxy that will delegate work to
/// the scheduler.
class adder
{
public:
adder(high_priority_scheduler& scheduler)
: scheduler_(scheduler)
{}
/// @brief Add a and b with a priority.
///
/// @return Return future result.
template <typename T>
boost::unique_future<T> add(
high_priority_scheduler::priority_type priority,
const T& a, const T& b)
{
// Insert method request
return scheduler_.insert(
priority,
boost::bind(&adder::do_add<T>, a, b));
}
/// @brief Add a and b.
///
/// @return Return future result.
template <typename T>
boost::unique_future<T> add(const T& a, const T& b)
{
return add(high_priority_scheduler::priority_type(), a, b);
}
private:
/// @brief Actual add a and b.
template <typename T>
static T do_add(const T& a, const T& b)
{
std::cout << "Starting addition of '" << a
<< "' and '" << b << "'" << std::endl;
// Mimic busy work.
boost::this_thread::sleep_for(boost::chrono::seconds(2));
std::cout << "Finished addition" << std::endl;
return a + b;
}
private:
high_priority_scheduler& scheduler_;
};
bool get(bool& value) { return value; }
void guarded_call()
{
std::cout << "guarded_call" << std::endl;
}
int main()
{
const unsigned int max_threads = 1;
const unsigned int max_request = 4;
// Sscheduler
high_priority_scheduler scheduler(max_threads, max_request);
// Proxy
adder adder(scheduler);
// Client
// Add guarded method to scheduler.
bool ready = false;
std::cout << "Add guarded method." << std::endl;
boost::unique_future<void> future1 = scheduler.insert(
boost::bind(&get, boost::ref(ready)),
&guarded_call);
// Add 1 + 100 with default priority.
boost::unique_future<int> future2 = adder.add(1, 100);
// Force sleep to try to get scheduler to run request 2 first.
boost::this_thread::sleep_for(boost::chrono::seconds(1));
// Add:
// 2 + 200 with low priority (5)
// "test" + "this" with high priority (99)
boost::unique_future<int> future3 = adder.add(5, 2, 200);
boost::unique_future<std::string> future4 = adder.add(99,
std::string("test"), std::string("this"));
// Max request should have been reached, so add another.
boost::unique_future<int> future5 = adder.add(3, 300);
// Check if request was added.
std::cout << "future1 is valid: " << future1.valid()
<< "\nfuture2 is valid: " << future2.valid()
<< "\nfuture3 is valid: " << future3.valid()
<< "\nfuture4 is valid: " << future4.valid()
<< "\nfuture5 is valid: " << future5.valid()
<< std::endl;
// Get results for future2 and future3. Do nothing with future4's results.
std::cout << "future2 result: " << future2.get()
<< "\nfuture3 result: " << future3.get()
<< std::endl;
std::cout << "Unguarding method." << std::endl;
ready = true;
future1.wait();
}
The execution uses thread pool of 1 with a max of 4 request.
- request1 is guarded until the end of program, and should be last to run.
- request2 (1 + 100) is inserted with default priority, and should be first to run.
- request3 (2 + 200) is inserted low priority, and should run after request4.
- request4 ('test' + 'this') is inserted with high priority, and should run before request3.
- request5 should fail to insert due to max request, and should not be valid.
The output is as follows:
Add guarded method.
Starting addition of '1' and '100'
future1 is valid: 1
future2 is valid: 1
future3 is valid: 1
future4 is valid: 1
future5 is valid: 0
Finished addition
Starting addition of 'test' and 'this'
Finished addition
Starting addition of '2' and '200'
Finished addition
future2 result: 101
future3 result: 202
Unguarding method.
guarded_call
using boost async API's with multiple threads
In the official chat example, chat_client::write()
defers work to the io_service
via io_service::post()
, which will:
- request that the
io_service
execute the given handler via a thread that is currently invoking thepoll()
,poll_one()
,run()
, orrun_one()
function on theio_service
- not allow the given handler to be invoked within the calling function (e.g.
chat_client::write()
)
As only one thread is running the io_service
, and all socket read, write, and close operations are only initiated from handlers that have been posted to the io_service
, the program satisfies the thread-safety requirement for socket
.
class chat_client
{
void write(const chat_message& msg)
{
// The nullary function `handler` is created, but not invoked within
// the calling function. `msg` is captured by value, allowing `handler`
// to append a valid `msg` object to `write_msgs_`.
auto handler = [this, msg]()
{
bool write_in_progress = !write_msgs_.empty();
write_msgs_.push_back(msg);
if (!write_in_progress)
{
do_write();
}
};
// Request that `handler` be invoked within the `io_service`.
io_service_.post(handler);
}
};
does boost::asio co_spawn create an actual thread?
It does not. See The Proactor Design Pattern: Concurrency Without Threads and https://www.boost.org/doc/libs/1_78_0/doc/html/boost_asio/overview/core/threads.html
What does detached
mean/do? The documentation says:
The detached_t class is used to indicate that an asynchronous operation is detached. That is, there is no completion handler waiting for the operation's result.
It comes down to writing a no-op handler but (a) less work (b) more room for the library to optimize.
Another angle to look at this from is this: if the execution context for the executor (io_ctx
) is never run/polled, nothing will ever happen. As always in boost, you decide where you run the service (whether you use threads e.g.)
boost:asio thread pool implementation for occasionally synchronized tasks
You may use futures for data processing and synchronize with them using boost::wait_for_all()
. This will allow you to operate in terms of parts of work done, not threads.
int process_data() {...}
// Pending futures
std::vector<boost::unique_future<int>> pending_data;
for(int i = 0; i < numSmallTasks; ++i)
{
// Create task and corresponding future
// Using shared ptr and binding operator() trick because
// packaged_task is non-copyable, but asio::io_service::post requires argument to be copyable
// Boost 1.51 syntax
// For Boost 1.53+ or C++11 std::packaged_task shall be boost::packaged_task<int()>
typedef boost::packaged_task<int> task_t;
boost::shared_ptr<task_t> task = boost::make_shared<task_t>(
boost::bind(&process_data, i, theTime));
boost::unique_future<int> fut = task->get_future();
pending_data.push_back(std::move(fut));
io_service.post(boost::bind(&task_t::operator(), task));
}
// After loop - wait until all futures are evaluated
boost::wait_for_all(pending_data.begin(), pending_data.end());
program crash using boost::asio
what should i do to prevent termination of thread??
Handle the exception (try/catch).
UPDATE:take care that io_service is ran from different thread NOTE:in this question Boost.Asio read_some: End of file error the answer suggests putting the read_some function in try catch block. he without explicitly saying changes read_some whick take 2nd parameter for error to the version without error.
if i need error parameter to continue my procedure ,what can i do?
You don't need the "error paramater" to continue the procedure - just handle the exception again.
In principle there is no difference between the versions that throw and the versions that set an error_code
:
errpr_code ec;
try {
foo();
} catch(boost::system::system_error& se) {
ec = se.code();
}
Is functionally equivalent to
errpr_code ec;
foo(ec);
Bonus
Note also you can just write if (ec)
(or even if (!ec.failed9))
) instead of if (ec.value() != boost::system::errc::errc_t::success)
. And you should.
Related Topics
Why Class Size Depend Only on Data Members and Not on Member Functions
Using Custom Camera in Opencv (Via Gstreamer)
Why Is the New Operator Allowed to Return *Void to Every Pointer-Type
Find Available Network Interfaces in C/C++
Specialization of Member Function Template After Instantiation Error, and Order of Member Functions
How to Store Variant Data in C++
Partial Specialization of Function Templates
Boost.Python: Wrap Functions to Release the Gil
What Is the Array Form of 'Delete'
What Happens If Main() Does Not Return an Int Value
Why Does Destructor Disable Generation of Implicit Move Methods
How to Read from Memory Just Like from a File Using iOStream
What Happens to the Memory Allocated by 'New' If the Constructor Throws
What's the Semantically Accurate Position for the Ampersand in C++ References