Boost::Asio with Boost::Unique_Future

boost::asio with boost::unique_future

Boost.Asio only provides first-class support for asynchronous operations to return a C++11 std::future or an actual value in stackful coroutines. Nevertheless, the requirements on asynchronous operations documents how to customize the return type for other types, such as Boost.Thread's boost::unique_future. It requires:

  • A specialization of the handler_type template. This template is used to determine the actual handler to use based on the asynchronous operation's signature.
  • A specialization of the async_result template. This template is used both to determine the return type and to extract the return value from the handler.

Below is a minimal complete example demonstrating deadline_timer::async_wait() returning boost:unique_future with a basic calculation being performed over a series of continuations composed with .then(). To keep the example simple, I have opted to only specialize handler_type for the asynchronous operation signatures used in the example. For a complete reference, I highly suggest reviewing use_future.hpp and impl/use_future.hpp.

#include <exception> // current_exception, make_exception_ptr
#include <memory> // make_shared, shared_ptr
#include <thread> // thread
#include <utility> // move

#define BOOST_RESULT_OF_USE_DECLTYPE
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION

#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/future.hpp>

/// @brief Class used to indicate an asynchronous operation should return
/// a boost::unique_future.
class use_unique_future_t {};

/// @brief A special value, similiar to std::nothrow.
constexpr use_unique_future_t use_unique_future;

namespace detail {

/// @brief Completion handler to adapt a boost::promise as a completion
/// handler.
template <typename T>
class unique_promise_handler;

/// @brief Completion handler to adapt a void boost::promise as a completion
/// handler.
template <>
class unique_promise_handler<void>
{
public:
/// @brief Construct from use_unique_future special value.
explicit unique_promise_handler(use_unique_future_t)
: promise_(std::make_shared<boost::promise<void> >())
{}

void operator()(const boost::system::error_code& error)
{
// On error, convert the error code into an exception and set it on
// the promise.
if (error)
promise_->set_exception(
std::make_exception_ptr(boost::system::system_error(error)));
// Otherwise, set the value.
else
promise_->set_value();
}

//private:
std::shared_ptr<boost::promise<void> > promise_;
};

// Ensure any exceptions thrown from the handler are propagated back to the
// caller via the future.
template <typename Function, typename T>
void asio_handler_invoke(
Function function,
unique_promise_handler<T>* handler)
{
// Guarantee the promise lives for the duration of the function call.
std::shared_ptr<boost::promise<T> > promise(handler->promise_);
try
{
function();
}
catch (...)
{
promise->set_exception(std::current_exception());
}
}

} // namespace detail

namespace boost {
namespace asio {

/// @brief Handler type specialization for use_unique_future.
template <typename ReturnType>
struct handler_type<
use_unique_future_t,
ReturnType(boost::system::error_code)>
{
typedef ::detail::unique_promise_handler<void> type;
};

/// @brief Handler traits specialization for unique_promise_handler.
template <typename T>
class async_result< ::detail::unique_promise_handler<T> >
{
public:
// The initiating function will return a boost::unique_future.
typedef boost::unique_future<T> type;

// Constructor creates a new promise for the async operation, and obtains the
// corresponding future.
explicit async_result(::detail::unique_promise_handler<T>& handler)
{
value_ = handler.promise_->get_future();
}

// Obtain the future to be returned from the initiating function.
type get() { return std::move(value_); }

private:
type value_;
};

} // namespace asio
} // namespace boost

int main()
{
boost::asio::io_service io_service;
boost::asio::io_service::work work(io_service);

// Run io_service in its own thread to demonstrate future usage.
std::thread thread([&io_service](){ io_service.run(); });

// Arm 3 second timer.
boost::asio::deadline_timer timer(
io_service, boost::posix_time::seconds(3));

// Asynchronously wait on the timer, then perform basic calculations
// within the future's continuations.
boost::unique_future<int> result =
timer.async_wait(use_unique_future)
.then([](boost::unique_future<void> future){
std::cout << "calculation 1" << std::endl;
return 21;
})
.then([](boost::unique_future<int> future){
std::cout << "calculation 2" << std::endl;
return 2 * future.get();
})
;

std::cout << "Waiting for result" << std::endl;
// Wait for the timer to trigger and for its continuations to calculate
// the result.
std::cout << result.get() << std::endl;

// Cleanup.
io_service.stop();
thread.join();
}

Output:

Waiting for result
calculation 1
calculation 2
42

boost::asio and Active Object

Boost.Asio can be used to encompass the intention of Active Object: decouple method execution from method invocation. Additional requirements will need to be handled at a higher-level, but it is not overly complex when using Boost.Asio in conjunction with other Boost libraries.

Scheduler could use:

  • boost::thread for thread abstraction.
  • boost::thread_group to manage lifetime of threads.
  • boost::asio::io_service to provide a threadpool. Will likely want to use boost::asio::io_service::work to keep threads alive when no work is pending.

ActivationList could be implemented as:

  • A Boost.MultiIndex for obtaining highest priority method request. With a hinted-position insert(), the insertion order is preserved for request with the same priority.
  • std::multiset or std::multimap can be used. However, it is unspecified in C++03 as to the order of request with the same key (priority).
  • If Request do not need an guard method, then std::priority_queue could be used.

Request could be an unspecified type:

  • boost::function and boost::bind could be used to provide a type-erasure, while binding to callable types without introducing a Request hierarchy.

Futures could use Boost.Thread's Futures support.

  • future.valid() will return true if Request has been added to ActivationList.
  • future.wait() will block waiting for a result to become available.
  • future.get() will block waiting for the result.
  • If caller does nothing with the future, then caller will not be blocked.
  • Another benefit to using Boost.Thread's Futures is that exceptions originating from within a Request will be passed to the Future.

Here is a complete example leveraging various Boost libraries and should meet the requirements:

// Standard includes
#include <algorithm> // std::find_if
#include <iostream>
#include <string>

// 3rd party includes
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/make_shared.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/utility/result_of.hpp>

/// @brief scheduler that provides limits with prioritized jobs.
template <typename Priority,
typename Compare = std::less<Priority> >
class scheduler
{
public:
typedef Priority priority_type;
private:

/// @brief method_request is used to couple the guard and call
/// functions for a given method.
struct method_request
{
typedef boost::function<bool()> ready_func_type;
typedef boost::function<void()> run_func_type;

template <typename ReadyFunctor,
typename RunFunctor>
method_request(ReadyFunctor ready,
RunFunctor run)
: ready(ready),
run(run)
{}

ready_func_type ready;
run_func_type run;
};

/// @brief Pair type used to associate a request with its priority.
typedef std::pair<priority_type,
boost::shared_ptr<method_request> > pair_type;

static bool is_method_ready(const pair_type& pair)
{
return pair.second->ready();
}

public:

/// @brief Construct scheduler.
///
/// @param max_threads Maximum amount of concurrent task.
/// @param max_request Maximum amount of request.
scheduler(std::size_t max_threads,
std::size_t max_request)
: work_(io_service_),
max_request_(max_request),
request_count_(0)
{
// Spawn threads, dedicating them to the io_service.
for (std::size_t i = 0; i < max_threads; ++i)
threads_.create_thread(
boost::bind(&boost::asio::io_service::run, &io_service_));
}

/// @brief Destructor.
~scheduler()
{
// Release threads from the io_service.
io_service_.stop();
// Cleanup.
threads_.join_all();
}

/// @brief Insert a method request into the scheduler.
///
/// @param priority Priority of job.
/// @param ready_func Invoked to check if method is ready to run.
/// @param run_func Invoked when ready to run.
///
/// @return future associated with the method.
template <typename ReadyFunctor,
typename RunFunctor>
boost::unique_future<typename boost::result_of<RunFunctor()>::type>
insert(priority_type priority,
const ReadyFunctor& ready_func,
const RunFunctor& run_func)
{
typedef typename boost::result_of<RunFunctor()>::type result_type;
typedef boost::unique_future<result_type> future_type;

boost::unique_lock<mutex_type> lock(mutex_);

// If max request has been reached, then return an invalid future.
if (max_request_ &&
(request_count_ == max_request_))
return future_type();

++request_count_;

// Use a packaged task to handle populating promise and future.
typedef boost::packaged_task<result_type> task_type;

// Bind does not work with rvalue, and packaged_task is only moveable,
// so allocate a shared pointer.
boost::shared_ptr<task_type> task =
boost::make_shared<task_type>(run_func);

// Create method request.
boost::shared_ptr<method_request> request =
boost::make_shared<method_request>(
ready_func,
boost::bind(&task_type::operator(), task));

// Insert into priority. Hint to inserting as close to the end as
// possible to preserve insertion order for request with same priority.
activation_list_.insert(activation_list_.end(),
pair_type(priority, request));

// There is now an outstanding request, so post to dispatch.
io_service_.post(boost::bind(&scheduler::dispatch, this));

return task->get_future();
}

/// @brief Insert a method request into the scheduler.
///
/// @param ready_func Invoked to check if method is ready to run.
/// @param run_func Invoked when ready to run.
///
/// @return future associated with the method.
template <typename ReadyFunctor,
typename RunFunctor>
boost::unique_future<typename boost::result_of<RunFunctor()>::type>
insert(const ReadyFunctor& ready_func,
const RunFunctor& run_func)
{
return insert(priority_type(), ready_func, run_func);
}

/// @brief Insert a method request into the scheduler.
///
/// @param priority Priority of job.
/// @param run_func Invoked when ready to run.
///
/// @return future associated with the method.
template <typename RunFunctor>
boost::unique_future<typename boost::result_of<RunFunctor()>::type>
insert(priority_type priority,
const RunFunctor& run_func)
{
return insert(priority, &always_ready, run_func);
}

/// @brief Insert a method request with default priority into the
/// scheduler.
///
/// @param run_func Invoked when ready to run.
///
/// @param functor Job to run.
///
/// @return future associated with the job.
template <typename RunFunc>
boost::unique_future<typename boost::result_of<RunFunc()>::type>
insert(const RunFunc& run_func)
{
return insert(&always_ready, run_func);
}

/// @brief Cancel all outstanding request.
void cancel()
{
boost::unique_lock<mutex_type> lock(mutex_);
activation_list_.clear();
request_count_ = 0;
}

private:

/// @brief Dispatch a request.
void dispatch()
{
// Get the current highest priority request ready to run from the queue.
boost::unique_lock<mutex_type> lock(mutex_);
if (activation_list_.empty()) return;

// Find the highest priority method ready to run.
typedef typename activation_list_type::iterator iterator;
iterator end = activation_list_.end();
iterator result = std::find_if(
activation_list_.begin(), end, &is_method_ready);

// If no methods are ready, then post into dispatch, as the
// method may have become ready.
if (end == result)
{
io_service_.post(boost::bind(&scheduler::dispatch, this));
return;
}

// Take ownership of request.
boost::shared_ptr<method_request> method = result->second;
activation_list_.erase(result);

// Run method without mutex.
lock.unlock();
method->run();
lock.lock();

// Perform bookkeeping.
--request_count_;
}

static bool always_ready() { return true; }

private:

/// @brief List of outstanding request.
typedef boost::multi_index_container<
pair_type,
boost::multi_index::indexed_by<
boost::multi_index::ordered_non_unique<
boost::multi_index::member<pair_type,
typename pair_type::first_type,
&pair_type::first>,
Compare
>
>
> activation_list_type;
activation_list_type activation_list_;

/// @brief Thread group managing threads servicing pool.
boost::thread_group threads_;

/// @brief io_service used to function as a thread pool.
boost::asio::io_service io_service_;

/// @brief Work is used to keep threads servicing io_service.
boost::asio::io_service::work work_;

/// @brief Maximum amount of request.
const std::size_t max_request_;

/// @brief Count of outstanding request.
std::size_t request_count_;

/// @brief Synchronize access to the activation list.
typedef boost::mutex mutex_type;
mutex_type mutex_;
};

typedef scheduler<unsigned int,
std::greater<unsigned int> > high_priority_scheduler;

/// @brief adder is a simple proxy that will delegate work to
/// the scheduler.
class adder
{
public:
adder(high_priority_scheduler& scheduler)
: scheduler_(scheduler)
{}

/// @brief Add a and b with a priority.
///
/// @return Return future result.
template <typename T>
boost::unique_future<T> add(
high_priority_scheduler::priority_type priority,
const T& a, const T& b)
{
// Insert method request
return scheduler_.insert(
priority,
boost::bind(&adder::do_add<T>, a, b));
}

/// @brief Add a and b.
///
/// @return Return future result.
template <typename T>
boost::unique_future<T> add(const T& a, const T& b)
{
return add(high_priority_scheduler::priority_type(), a, b);
}

private:

/// @brief Actual add a and b.
template <typename T>
static T do_add(const T& a, const T& b)
{
std::cout << "Starting addition of '" << a
<< "' and '" << b << "'" << std::endl;
// Mimic busy work.
boost::this_thread::sleep_for(boost::chrono::seconds(2));
std::cout << "Finished addition" << std::endl;
return a + b;
}

private:
high_priority_scheduler& scheduler_;
};

bool get(bool& value) { return value; }
void guarded_call()
{
std::cout << "guarded_call" << std::endl;
}

int main()
{
const unsigned int max_threads = 1;
const unsigned int max_request = 4;

// Sscheduler
high_priority_scheduler scheduler(max_threads, max_request);

// Proxy
adder adder(scheduler);

// Client

// Add guarded method to scheduler.
bool ready = false;
std::cout << "Add guarded method." << std::endl;
boost::unique_future<void> future1 = scheduler.insert(
boost::bind(&get, boost::ref(ready)),
&guarded_call);

// Add 1 + 100 with default priority.
boost::unique_future<int> future2 = adder.add(1, 100);

// Force sleep to try to get scheduler to run request 2 first.
boost::this_thread::sleep_for(boost::chrono::seconds(1));

// Add:
// 2 + 200 with low priority (5)
// "test" + "this" with high priority (99)
boost::unique_future<int> future3 = adder.add(5, 2, 200);
boost::unique_future<std::string> future4 = adder.add(99,
std::string("test"), std::string("this"));

// Max request should have been reached, so add another.
boost::unique_future<int> future5 = adder.add(3, 300);

// Check if request was added.
std::cout << "future1 is valid: " << future1.valid()
<< "\nfuture2 is valid: " << future2.valid()
<< "\nfuture3 is valid: " << future3.valid()
<< "\nfuture4 is valid: " << future4.valid()
<< "\nfuture5 is valid: " << future5.valid()
<< std::endl;

// Get results for future2 and future3. Do nothing with future4's results.
std::cout << "future2 result: " << future2.get()
<< "\nfuture3 result: " << future3.get()
<< std::endl;

std::cout << "Unguarding method." << std::endl;
ready = true;
future1.wait();
}

The execution uses thread pool of 1 with a max of 4 request.

  • request1 is guarded until the end of program, and should be last to run.
  • request2 (1 + 100) is inserted with default priority, and should be first to run.
  • request3 (2 + 200) is inserted low priority, and should run after request4.
  • request4 ('test' + 'this') is inserted with high priority, and should run before request3.
  • request5 should fail to insert due to max request, and should not be valid.

The output is as follows:

Add guarded method.
Starting addition of '1' and '100'
future1 is valid: 1
future2 is valid: 1
future3 is valid: 1
future4 is valid: 1
future5 is valid: 0
Finished addition
Starting addition of 'test' and 'this'
Finished addition
Starting addition of '2' and '200'
Finished addition
future2 result: 101
future3 result: 202
Unguarding method.
guarded_call

using boost async API's with multiple threads

In the official chat example, chat_client::write() defers work to the io_service via io_service::post(), which will:

  • request that the io_service execute the given handler via a thread that is currently invoking the poll(), poll_one(), run(), or run_one() function on the io_service
  • not allow the given handler to be invoked within the calling function (e.g. chat_client::write())

As only one thread is running the io_service, and all socket read, write, and close operations are only initiated from handlers that have been posted to the io_service, the program satisfies the thread-safety requirement for socket.

class chat_client
{
void write(const chat_message& msg)
{
// The nullary function `handler` is created, but not invoked within
// the calling function. `msg` is captured by value, allowing `handler`
// to append a valid `msg` object to `write_msgs_`.
auto handler = [this, msg]()
{
bool write_in_progress = !write_msgs_.empty();
write_msgs_.push_back(msg);
if (!write_in_progress)
{
do_write();
}
};

// Request that `handler` be invoked within the `io_service`.
io_service_.post(handler);
}
};

does boost::asio co_spawn create an actual thread?

It does not. See The Proactor Design Pattern: Concurrency Without Threads and https://www.boost.org/doc/libs/1_78_0/doc/html/boost_asio/overview/core/threads.html

What does detached mean/do? The documentation says:

The detached_t class is used to indicate that an asynchronous operation is detached. That is, there is no completion handler waiting for the operation's result.

It comes down to writing a no-op handler but (a) less work (b) more room for the library to optimize.


Another angle to look at this from is this: if the execution context for the executor (io_ctx) is never run/polled, nothing will ever happen. As always in boost, you decide where you run the service (whether you use threads e.g.)

boost:asio thread pool implementation for occasionally synchronized tasks

You may use futures for data processing and synchronize with them using boost::wait_for_all(). This will allow you to operate in terms of parts of work done, not threads.

int process_data() {...}

// Pending futures
std::vector<boost::unique_future<int>> pending_data;

for(int i = 0; i < numSmallTasks; ++i)
{
// Create task and corresponding future
// Using shared ptr and binding operator() trick because
// packaged_task is non-copyable, but asio::io_service::post requires argument to be copyable

// Boost 1.51 syntax
// For Boost 1.53+ or C++11 std::packaged_task shall be boost::packaged_task<int()>
typedef boost::packaged_task<int> task_t;

boost::shared_ptr<task_t> task = boost::make_shared<task_t>(
boost::bind(&process_data, i, theTime));

boost::unique_future<int> fut = task->get_future();

pending_data.push_back(std::move(fut));
io_service.post(boost::bind(&task_t::operator(), task));
}

// After loop - wait until all futures are evaluated
boost::wait_for_all(pending_data.begin(), pending_data.end());

program crash using boost::asio

what should i do to prevent termination of thread??

Handle the exception (try/catch).

UPDATE:take care that io_service is ran from different thread NOTE:in this question Boost.Asio read_some: End of file error the answer suggests putting the read_some function in try catch block. he without explicitly saying changes read_some whick take 2nd parameter for error to the version without error.
if i need error parameter to continue my procedure ,what can i do?

You don't need the "error paramater" to continue the procedure - just handle the exception again.

In principle there is no difference between the versions that throw and the versions that set an error_code:

errpr_code ec;
try {
foo();
} catch(boost::system::system_error& se) {
ec = se.code();
}

Is functionally equivalent to

errpr_code ec;
foo(ec);

Bonus

Note also you can just write if (ec) (or even if (!ec.failed9))) instead of if (ec.value() != boost::system::errc::errc_t::success). And you should.



Related Topics



Leave a reply



Submit