Boost::Asio and Active Object

boost::asio and Active Object

Boost.Asio can be used to encompass the intention of Active Object: decouple method execution from method invocation. Additional requirements will need to be handled at a higher-level, but it is not overly complex when using Boost.Asio in conjunction with other Boost libraries.

Scheduler could use:

  • boost::thread for thread abstraction.
  • boost::thread_group to manage lifetime of threads.
  • boost::asio::io_service to provide a threadpool. Will likely want to use boost::asio::io_service::work to keep threads alive when no work is pending.

ActivationList could be implemented as:

  • A Boost.MultiIndex for obtaining highest priority method request. With a hinted-position insert(), the insertion order is preserved for request with the same priority.
  • std::multiset or std::multimap can be used. However, it is unspecified in C++03 as to the order of request with the same key (priority).
  • If Request do not need an guard method, then std::priority_queue could be used.

Request could be an unspecified type:

  • boost::function and boost::bind could be used to provide a type-erasure, while binding to callable types without introducing a Request hierarchy.

Futures could use Boost.Thread's Futures support.

  • future.valid() will return true if Request has been added to ActivationList.
  • future.wait() will block waiting for a result to become available.
  • future.get() will block waiting for the result.
  • If caller does nothing with the future, then caller will not be blocked.
  • Another benefit to using Boost.Thread's Futures is that exceptions originating from within a Request will be passed to the Future.

Here is a complete example leveraging various Boost libraries and should meet the requirements:

// Standard includes
#include <algorithm> // std::find_if
#include <iostream>
#include <string>

// 3rd party includes
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/make_shared.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/utility/result_of.hpp>

/// @brief scheduler that provides limits with prioritized jobs.
template <typename Priority,
typename Compare = std::less<Priority> >
class scheduler
{
public:
typedef Priority priority_type;
private:

/// @brief method_request is used to couple the guard and call
/// functions for a given method.
struct method_request
{
typedef boost::function<bool()> ready_func_type;
typedef boost::function<void()> run_func_type;

template <typename ReadyFunctor,
typename RunFunctor>
method_request(ReadyFunctor ready,
RunFunctor run)
: ready(ready),
run(run)
{}

ready_func_type ready;
run_func_type run;
};

/// @brief Pair type used to associate a request with its priority.
typedef std::pair<priority_type,
boost::shared_ptr<method_request> > pair_type;

static bool is_method_ready(const pair_type& pair)
{
return pair.second->ready();
}

public:

/// @brief Construct scheduler.
///
/// @param max_threads Maximum amount of concurrent task.
/// @param max_request Maximum amount of request.
scheduler(std::size_t max_threads,
std::size_t max_request)
: work_(io_service_),
max_request_(max_request),
request_count_(0)
{
// Spawn threads, dedicating them to the io_service.
for (std::size_t i = 0; i < max_threads; ++i)
threads_.create_thread(
boost::bind(&boost::asio::io_service::run, &io_service_));
}

/// @brief Destructor.
~scheduler()
{
// Release threads from the io_service.
io_service_.stop();
// Cleanup.
threads_.join_all();
}

/// @brief Insert a method request into the scheduler.
///
/// @param priority Priority of job.
/// @param ready_func Invoked to check if method is ready to run.
/// @param run_func Invoked when ready to run.
///
/// @return future associated with the method.
template <typename ReadyFunctor,
typename RunFunctor>
boost::unique_future<typename boost::result_of<RunFunctor()>::type>
insert(priority_type priority,
const ReadyFunctor& ready_func,
const RunFunctor& run_func)
{
typedef typename boost::result_of<RunFunctor()>::type result_type;
typedef boost::unique_future<result_type> future_type;

boost::unique_lock<mutex_type> lock(mutex_);

// If max request has been reached, then return an invalid future.
if (max_request_ &&
(request_count_ == max_request_))
return future_type();

++request_count_;

// Use a packaged task to handle populating promise and future.
typedef boost::packaged_task<result_type> task_type;

// Bind does not work with rvalue, and packaged_task is only moveable,
// so allocate a shared pointer.
boost::shared_ptr<task_type> task =
boost::make_shared<task_type>(run_func);

// Create method request.
boost::shared_ptr<method_request> request =
boost::make_shared<method_request>(
ready_func,
boost::bind(&task_type::operator(), task));

// Insert into priority. Hint to inserting as close to the end as
// possible to preserve insertion order for request with same priority.
activation_list_.insert(activation_list_.end(),
pair_type(priority, request));

// There is now an outstanding request, so post to dispatch.
io_service_.post(boost::bind(&scheduler::dispatch, this));

return task->get_future();
}

/// @brief Insert a method request into the scheduler.
///
/// @param ready_func Invoked to check if method is ready to run.
/// @param run_func Invoked when ready to run.
///
/// @return future associated with the method.
template <typename ReadyFunctor,
typename RunFunctor>
boost::unique_future<typename boost::result_of<RunFunctor()>::type>
insert(const ReadyFunctor& ready_func,
const RunFunctor& run_func)
{
return insert(priority_type(), ready_func, run_func);
}

/// @brief Insert a method request into the scheduler.
///
/// @param priority Priority of job.
/// @param run_func Invoked when ready to run.
///
/// @return future associated with the method.
template <typename RunFunctor>
boost::unique_future<typename boost::result_of<RunFunctor()>::type>
insert(priority_type priority,
const RunFunctor& run_func)
{
return insert(priority, &always_ready, run_func);
}

/// @brief Insert a method request with default priority into the
/// scheduler.
///
/// @param run_func Invoked when ready to run.
///
/// @param functor Job to run.
///
/// @return future associated with the job.
template <typename RunFunc>
boost::unique_future<typename boost::result_of<RunFunc()>::type>
insert(const RunFunc& run_func)
{
return insert(&always_ready, run_func);
}

/// @brief Cancel all outstanding request.
void cancel()
{
boost::unique_lock<mutex_type> lock(mutex_);
activation_list_.clear();
request_count_ = 0;
}

private:

/// @brief Dispatch a request.
void dispatch()
{
// Get the current highest priority request ready to run from the queue.
boost::unique_lock<mutex_type> lock(mutex_);
if (activation_list_.empty()) return;

// Find the highest priority method ready to run.
typedef typename activation_list_type::iterator iterator;
iterator end = activation_list_.end();
iterator result = std::find_if(
activation_list_.begin(), end, &is_method_ready);

// If no methods are ready, then post into dispatch, as the
// method may have become ready.
if (end == result)
{
io_service_.post(boost::bind(&scheduler::dispatch, this));
return;
}

// Take ownership of request.
boost::shared_ptr<method_request> method = result->second;
activation_list_.erase(result);

// Run method without mutex.
lock.unlock();
method->run();
lock.lock();

// Perform bookkeeping.
--request_count_;
}

static bool always_ready() { return true; }

private:

/// @brief List of outstanding request.
typedef boost::multi_index_container<
pair_type,
boost::multi_index::indexed_by<
boost::multi_index::ordered_non_unique<
boost::multi_index::member<pair_type,
typename pair_type::first_type,
&pair_type::first>,
Compare
>
>
> activation_list_type;
activation_list_type activation_list_;

/// @brief Thread group managing threads servicing pool.
boost::thread_group threads_;

/// @brief io_service used to function as a thread pool.
boost::asio::io_service io_service_;

/// @brief Work is used to keep threads servicing io_service.
boost::asio::io_service::work work_;

/// @brief Maximum amount of request.
const std::size_t max_request_;

/// @brief Count of outstanding request.
std::size_t request_count_;

/// @brief Synchronize access to the activation list.
typedef boost::mutex mutex_type;
mutex_type mutex_;
};

typedef scheduler<unsigned int,
std::greater<unsigned int> > high_priority_scheduler;

/// @brief adder is a simple proxy that will delegate work to
/// the scheduler.
class adder
{
public:
adder(high_priority_scheduler& scheduler)
: scheduler_(scheduler)
{}

/// @brief Add a and b with a priority.
///
/// @return Return future result.
template <typename T>
boost::unique_future<T> add(
high_priority_scheduler::priority_type priority,
const T& a, const T& b)
{
// Insert method request
return scheduler_.insert(
priority,
boost::bind(&adder::do_add<T>, a, b));
}

/// @brief Add a and b.
///
/// @return Return future result.
template <typename T>
boost::unique_future<T> add(const T& a, const T& b)
{
return add(high_priority_scheduler::priority_type(), a, b);
}

private:

/// @brief Actual add a and b.
template <typename T>
static T do_add(const T& a, const T& b)
{
std::cout << "Starting addition of '" << a
<< "' and '" << b << "'" << std::endl;
// Mimic busy work.
boost::this_thread::sleep_for(boost::chrono::seconds(2));
std::cout << "Finished addition" << std::endl;
return a + b;
}

private:
high_priority_scheduler& scheduler_;
};

bool get(bool& value) { return value; }
void guarded_call()
{
std::cout << "guarded_call" << std::endl;
}

int main()
{
const unsigned int max_threads = 1;
const unsigned int max_request = 4;

// Sscheduler
high_priority_scheduler scheduler(max_threads, max_request);

// Proxy
adder adder(scheduler);

// Client

// Add guarded method to scheduler.
bool ready = false;
std::cout << "Add guarded method." << std::endl;
boost::unique_future<void> future1 = scheduler.insert(
boost::bind(&get, boost::ref(ready)),
&guarded_call);

// Add 1 + 100 with default priority.
boost::unique_future<int> future2 = adder.add(1, 100);

// Force sleep to try to get scheduler to run request 2 first.
boost::this_thread::sleep_for(boost::chrono::seconds(1));

// Add:
// 2 + 200 with low priority (5)
// "test" + "this" with high priority (99)
boost::unique_future<int> future3 = adder.add(5, 2, 200);
boost::unique_future<std::string> future4 = adder.add(99,
std::string("test"), std::string("this"));

// Max request should have been reached, so add another.
boost::unique_future<int> future5 = adder.add(3, 300);

// Check if request was added.
std::cout << "future1 is valid: " << future1.valid()
<< "\nfuture2 is valid: " << future2.valid()
<< "\nfuture3 is valid: " << future3.valid()
<< "\nfuture4 is valid: " << future4.valid()
<< "\nfuture5 is valid: " << future5.valid()
<< std::endl;

// Get results for future2 and future3. Do nothing with future4's results.
std::cout << "future2 result: " << future2.get()
<< "\nfuture3 result: " << future3.get()
<< std::endl;

std::cout << "Unguarding method." << std::endl;
ready = true;
future1.wait();
}

The execution uses thread pool of 1 with a max of 4 request.

  • request1 is guarded until the end of program, and should be last to run.
  • request2 (1 + 100) is inserted with default priority, and should be first to run.
  • request3 (2 + 200) is inserted low priority, and should run after request4.
  • request4 ('test' + 'this') is inserted with high priority, and should run before request3.
  • request5 should fail to insert due to max request, and should not be valid.

The output is as follows:

Add guarded method.
Starting addition of '1' and '100'
future1 is valid: 1
future2 is valid: 1
future3 is valid: 1
future4 is valid: 1
future5 is valid: 0
Finished addition
Starting addition of 'test' and 'this'
Finished addition
Starting addition of '2' and '200'
Finished addition
future2 result: 101
future3 result: 202
Unguarding method.
guarded_call

Boost Asio share same io_service along disposible objects

I'm developing a program, which consists of bunch of Active Objects, that sending messages to each other. I'm using one same io_service to initialize all these objects. So they're working to end of the software life.

Sounds like a good fit. I would recommend using Chris Kohlhoff's recipe if you need operations to be more efficient on machines with multiple processors.

However I couldn't be sure about the objects with short lives. I'm using the short lived objects to open tcp socket to send a quick message to a remote endpoint then dispose the socket immediately. I'm thinking to make these also asynchronous.

There's nothing wrong with having few(er)long-lived asio io_service objects (e.g. you could create the same number of io_services as there are processors on the machine), and short lived objects that use the io_service. I would say this is more efficient as well since you don't have to fire-up a thread to call io_service::run on each (short-lived?) io_service and you can avoid unnecessary context switching.

Making the sockets asynchronous is also needed if you want/need to avoid blocking in your thread(s), especially if there are network issues, etc.

Boost asio priority and strand

The latter has nothing to do with priority queuing and everything with operation serialization.

You can achieve this by using the Active Object pattern. An example of Active Object using Asio is in this post: boost::asio and Active Object

You can "simply" combine this pattern with the pattern for priority queuing that you saw in the example.

threading-related active object design questions (c++ boost)

Finally settled on the following:

1) After much testing use of condition variable seems fine

2) This issue hasn't cropped up (yet)

3) The templated class implementation must meet the requirements, unit tests are used to
test for correctness

4) Improvements

  • Added join with lock
  • Catching exceptions in spawned thread and rethrowing in main thread to avoid crashes and to not loose exception info
  • Using boost::system::error_code to communicate error codes back to caller
  • implementation object is set-able

Code:

template <typename T>
class IService : private boost::noncopyable
{
typedef boost::shared_ptr<boost::thread> thread_ptr;
typedef T ServiceImpl;
public:
typedef boost::shared_ptr<IService<T> > ptr;

IService()
:m_pServiceObject(&m_serviceObject)
{
}

~IService()
{
/// try stop the service in case it's running
if (m_pServiceThread && m_pServiceThread->joinable())
{
stop();
}
}

static ptr create()
{
return boost::make_shared<IService<T> >();
}

/// Accessor to service implementation. The handle can be used to configure the implementation object
ServiceImpl& get() { return m_serviceObject; }
/// Mutator to service implementation. The handle can be used to configure the implementation object
void set(ServiceImpl rServiceImpl)
{
// the implementation object cannot be modified once the thread has been created
assert(m_pServiceThread == 0);
m_serviceObject = rServiceImpl;
m_pServiceObject = &m_serviceObject;
}

void set(ServiceImpl* pServiceImpl)
{
// the implementation object cannot be modified once the thread has been created
assert(m_pServiceThread == 0);

// make sure service object is valid
if (pServiceImpl)
m_pServiceObject = pServiceImpl;
}

/// if the service implementation reports an error from the start or stop method call, it can be accessed via this method
/// NB: only the last error can be accessed
boost::system::error_code getServiceErrorCode() const { return m_ecService; }

/// The join method allows the caller to block until thread completion
void join()
{
// protect this method from being called twice (e.g. by user and by stop)
boost::mutex::scoped_lock lock(m_joinMutex);
if (m_pServiceThread && m_pServiceThread->joinable())
{
m_pServiceThread->join();
m_pServiceThread.reset();
}
}

/// This method launches the non-blocking service
boost::system::error_code start()
{
boost::mutex::scoped_lock lock(m_threadMutex);

if (m_pServiceThread && m_pServiceThread->joinable())
{
// already running
return boost::system::error_code(SHARED_INVALID_STATE, shared_category);
}

m_pServiceThread = thread_ptr(new boost::thread(boost::bind(&IService2::main, this)));
// Wait for condition to be signaled
m_startCondition.wait(m_threadMutex);

// notify main to continue: it's blocked on the same condition var
m_startCondition.notify_one();
// No error
return boost::system::error_code();
}

/// This method stops the non-blocking service
boost::system::error_code stop()
{
// trigger the stopping of the event loop
//boost::system::error_code ec = m_serviceObject.stop();
assert(m_pServiceObject);
boost::system::error_code ec = m_pServiceObject->stop();
if (ec)
{
m_ecService = ec;
return ec;
}

// The service implementation can return an error code here for more information
// However it is the responsibility of the implementation to stop the service event loop (if running)
// Failure to do so, will result in a block
// If this occurs in practice, we may consider a timed join?
join();

// If exception was thrown in new thread, rethrow it.
// Should the template implementation class want to avoid this, it should catch the exception
// in its start method and then return and error code instead
if( m_exception )
boost::rethrow_exception(m_exception);

return ec;
}

private:
/// runs in it's own thread
void main()
{
try
{
boost::mutex::scoped_lock lock(m_threadMutex);
// notify main thread that it can continue
m_startCondition.notify_one();
// Try Dummy wait to allow 1st thread to resume
m_startCondition.wait(m_threadMutex);

// call implementation of event loop
// This will block
// In scenarios where the service fails to start, the implementation can return an error code
m_ecService = m_pServiceObject->start();

m_exception = boost::exception_ptr();
}
catch (...)
{
m_exception = boost::current_exception();
}
}

/// Service thread
thread_ptr m_pServiceThread;
/// Thread mutex
mutable boost::mutex m_threadMutex;
/// Join mutex
mutable boost::mutex m_joinMutex;
/// Condition for signaling start of thread
boost::condition m_startCondition;

/// T must satisfy the implicit service interface and provide a start and a stop method
T m_serviceObject;
T* m_pServiceObject;
// Error code for service implementation errors
boost::system::error_code m_ecService;

// Exception ptr to transport exception across different threads
boost::exception_ptr m_exception;
};

Further feedback/criticism would of course be welcome.

Synchronized and concurrent data structure patterns with Boost.Asio

I am looking for a systematic way to apply strand synchronization to:

  • STL (or STL-like) containers (e.g., std::deque, std::unordered_map); and

You're looking for something that doesn't exist. The closest thing is Active Objects, and you hardly need strands for that unless you have asynchronous operations on them. That makes almost zero sense, because no operation on STL containers should have a time complexity enough to warrant asynchrony. The computational complexity on the other hand would be such that adding any kind of synchronization would be very suboptimal -

Instead of fine-grained locking [which you automatic opt for when doing STL data structures as ActiveObjects] you will always find better performance with coarse-grained locking.

Even sooner in the design, you will always have more performance by reducing sharing than by "optimizing synchronization" (those are a contradiction).

  • wait-free containers such as boost::lockfree::spsc_queue or folly::ProducerConsumerQueue.

Why would you even synchronize access to wait-free containers. Wait free implies no synchronization.

The Bullets

  1. To adapt an arbitrary STL container for safe synchronized use, is it sufficient to perform all its operations through a strand instance?

    Yes. Only that's not something that exists. Strands wrap async tasks (and their completion handlers, which are just tasks from the POV of the executor).

    See the rant above.

  2. To adapt a wait-free read-write container for synchronized, concurrent use, is it sufficient to wrap its operations through two distinct strands, one for read operations and one for write operations?

    Like mentioned, it's silly to synchronize access to lock-free constructs.

    This question hints at a "yes", although in that use case the author describes using a strand to coordinate producers from several threads, while presumably only reading from one thread.

    That's specifically related to the SPSC queue, i.e. where additional constraints are placed on threads performing read/write operations.

    While indeed the solution here is to create logical threads of execution with exclusive access to either set of operations, notice that you are constraining the tasks, which is fundamentally different angle from constraining the data.

  3. If the answer to 1-2 above is yes, should the strand just manage operations on the data structure through calls to boost::asio::post?

    So, the answer wasn't "yes". The idea of posting all operations through post would come down to implementing the Active Object pattern, as mentioned in my introduction. Yes, you can do that, and no, that's not gonna be smart. (I'm pretty sure that if you do, by definition you can forget about using lock-free containers)

    [....]

    Then should MyDeque::push_back(const T& t) just call

    boost::asio::post(_strand, [&_deque]{ _deque.push_back(t); })

    Yes, that's the ActiveObject pattern. However, consider how you would implement top(). Consider what you'd do if you had two MyDeque instances (a and `b) and wanted to move items from one to another:

    if (!a.empty()) {
    auto value = a.top(); // synchronizes on the strand to have the return value
    a.pop(); // operation on the strand of a
    b.push(std::move(value)); // operation on the strand of b
    }

    Since queue b is not on the strand of a, b.push() could actually commit before a.pop(), which may not be what you expect. Also, it's is bleedingly obvious that all the fine-grained synchronization steps are going to be far less efficient than having a strand for all operations that work on a set of data structures.

  4. [...] But it seems that [...] that the force of a fully concurrent vector or hash map may be a bit overkill

    There's no "force" in fully concurrent vectors or hash maps. There's a cost to them (in terms of processor business) and a gain (in terms of lower latency). In the cases you mention, latency is rarely the issue (registering a session is an infrequent event, and dominated by actual IO speeds), so you'd be best of using the simplest thing (IMO that would be single-threaded server for those datastructures). Have workers for any non-trivial operations - they could run on a pool of threads. (E.g. if you decide to implement a chess-playing chat-bot)


You want strands to form logical threads of execution. You want to synchronize access to your datastructures, not your datastructures per se. Sometimes lockfree datastructures are a simple choice to avoid having to design things well, but don't expect it magically perform well.

Some links:

  • boost::asio and Active Object (Tanner Sansbury on ActiveObject with Asio) has a lot of thoughts that overlap with your questions
  • How to design proper release of a boost::asio socket or wrapper thereof is my example of maintaining a list of connections

Boost asio async_resolve object lifetime

Just running the service (io_service::run()) already ensures that all asynchronous operations have completed (see the documentation).

You already do this on the worker thread, and you join that thread, so you should be fine!

The only exception would be if a handler throws, so to be extra-precise you should handle exceptions from the run(): Should the exception thrown by boost::asio::io_service::run() be caught?

void io() { 
// http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
for (;;) {
try {
io_service.run();
break; // exited normally
} catch (std::exception const &e) {
std::cerr << "[Resolver] An unexpected error occurred: " << e.what();
} catch (...) {
std::cerr << "[Resolver] An unexpected error occurred";
}
}
}

So... Where's The Problem?

The problem is quite finicky and hides between threads and shared_ptr.

The shared pointer causes ~Resolver to run on the worker thread. This means that you cannot join() the the worker thread (since a thread can never join itself). A good implementation will throw an exception, which causes the process to terminate.

And there's more: if you just exit main() while the worker thread is processing the asynchronous tasks, the completion handlers may run after globals like std::cout have been torn down. So to actually **see* that Resolver completes the work and destructs, you need to make sure that main doesn't exit too quickly.

Simplifying:

Now, the following is a simplified example that does show that the asynchronous operations do complete: (there are still issues):

#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 1
#include <boost/asio.hpp>
#include <boost/optional.hpp>
#include <thread>
#include <iostream>

class Resolver : public std::enable_shared_from_this<Resolver> {
using tcp = boost::asio::ip::tcp;
using io_service = boost::asio::io_service;

io_service _svc;
tcp::resolver resolver { _svc };

boost::optional<io_service::work> work { _svc };
std::thread _worker { [this] { event_loop(); } };

void event_loop() {
// http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
for (;;) {
std::cout << __PRETTY_FUNCTION__ << "\n";
try {
_svc.run();
break; // exited normally
} catch (std::exception const &e) {
std::cerr << "[Resolver] An unexpected error occurred: " << e.what() << "\n";
} catch (...) {
std::cerr << "[Resolver] An unexpected error occurred\n";
}
}
std::cout << "EXIT " << __PRETTY_FUNCTION__ << "\n";
}

public:
~Resolver() {
std::cout << __PRETTY_FUNCTION__ << "\n";
work.reset();
}


Related Topics



Leave a reply



Submit