When to Use 'Asio_Handler_Invoke'

When to use `asio_handler_invoke`?

In short, wrapping a handler and asio_handler_invoke accomplish two different tasks:

  • wrap a handler to customize the invocation of a handler.
  • define asio_handler_invoke hook to customize the invocation of other handlers in the context of a handler.
template <typename Handler>
struct custom_handler
{
void operator()(...); // Customize invocation of handler_.
Handler handler_;
};

// Customize invocation of Function within context of custom_handler.
template <typename Function>
void asio_handler_invoke(Function function, custom_handler* context);

// Invoke custom invocation of 'perform' within context of custom_handler.
void perform() {...}
custom_handler handler;
using boost::asio::asio_handler_invoke;
asio_handler_invoke(std::bind(&perform), &handler);

The primary reason for the asio_handler_invoke hook is to allow one to customize the invocation strategy of handlers to which the application may not have direct access. For instance, consider composed operations that are composed of zero or more calls to intermediate operations. For each intermediate operation, an intermediate handler will be created on behalf of the application, but the application does not have direct access to these handlers. When using custom handlers, the asio_handler_invoke hook provides a way to customize the invocation strategy of these intermediate handlers within a given context. The documentation states:

When asynchronous operations are composed from other asynchronous operations, all intermediate handlers should be invoked using the same method as the final handler. This is required to ensure that user-defined objects are not accessed in a way that may violate the guarantees. This [asio_handler_invoke] hooking function ensures that the invoked method used for the final handler is accessible at each intermediate step.


asio_handler_invoke

Consider a case where we wish to count the number of asynchronous operations executed, including each intermediate operation in composed operations. To do this, we need to create a custom handler type, counting_handler, and count the number of times functions are invoked within its context:

template <typename Handler>
class counting_handler
{
void operator()(...)
{
// invoke handler
}
Handler handler_;
};

template <typename Function>
void asio_handler_invoke(Function function, counting_handler* context)
{
// increment counter
// invoke function
}

counting_handler handler(&handle_read);
boost::asio::async_read(socket, buffer, handler);

In the above snippet, the function handle_read is wrapped by counting_handler. As the counting_handler is not interested in counting the number of times the wrapped handler is invoked, its operator() will not increment the count and just invoke handle_read. However, the counting_handler is interested in the amount of handlers invoked within its context in the async_read operation, so the custom invocation strategy in asio_handler_invoke will increment a count.


Example

Here is a concrete example based on the above counting_handler type. The operation_counter class provides a way to easily wrap application handlers with a counting_handler:

namespace detail {

/// @brief counting_handler is a handler that counts the number of
/// times a handler is invoked within its context.
template <class Handler>
class counting_handler
{
public:
counting_handler(Handler handler, std::size_t& count)
: handler_(handler),
count_(count)
{}

template <class... Args>
void operator()(Args&&... args)
{
handler_(std::forward<Args>(args)...);
}

template <typename Function>
friend void asio_handler_invoke(
Function intermediate_handler,
counting_handler* my_handler)
{
++my_handler->count_;
// Support chaining custom strategies incase the wrapped handler
// has a custom strategy of its own.
using boost::asio::asio_handler_invoke;
asio_handler_invoke(intermediate_handler, &my_handler->handler_);
}

private:
Handler handler_;
std::size_t& count_;
};

} // namespace detail

/// @brief Auxiliary class used to wrap handlers that will count
/// the number of functions invoked in their context.
class operation_counter
{
public:

template <class Handler>
detail::counting_handler<Handler> wrap(Handler handler)
{
return detail::counting_handler<Handler>(handler, count_);
}

std::size_t count() { return count_; }

private:
std::size_t count_ = 0;
};

...

operation_counter counter;
boost::asio::async_read(socket, buffer, counter.wrap(&handle_read));
io_service.run();
std::cout << "Count of async_read_some operations: " <<
counter.count() << std::endl;

The async_read() composed operation will be implemented in zero or more intermediate stream.async_read_some() operations. For each of these intermediate operations, a handler with an unspecified type will be created and invoked. If the above async_read() operation was implemented in terms of 2 intermediate async_read_some() operations, then counter.count() will be 2, and the handler returned from counter.wrap() got invoked once.

On the other hand, if one were to not provide an asio_handler_invoke hook and instead only incremented the count within the wrapped handler's invocation, then the count would be 1, reflecting only the count of times the wrapped handler was invoked:

template <class Handler>
class counting_handler
{
public:
...

template <class... Args>
void operator()(Args&&... args)
{
++count_;
handler_(std::forward<Args>(args)...);
}

// No asio_handler_invoke implemented.
};

Here is a complete example demonstrating counting the number of asynchronous operations that get executed, including intermediate operations from a composed operation. The example only initiates three async operations (async_accept, async_connect, and async_read), but the async_read operation will be composed of 2 intermediate async_read_some operations:

#include <functional> // std::bind
#include <iostream> // std::cout, std::endl
#include <utility> // std::forward
#include <boost/asio.hpp>

// This example is not interested in the handlers, so provide a noop function
// that will be passed to bind to meet the handler concept requirements.
void noop() {}

namespace detail {

/// @brief counting_handler is a handler that counts the number of
/// times a handler is invoked within its context.
template <class Handler>
class counting_handler
{
public:
counting_handler(Handler handler, std::size_t& count)
: handler_(handler),
count_(count)
{}

template <class... Args>
void operator()(Args&&... args)
{
handler_(std::forward<Args>(args)...);
}

template <typename Function>
friend void asio_handler_invoke(
Function function,
counting_handler* context)
{
++context->count_;
// Support chaining custom strategies incase the wrapped handler
// has a custom strategy of its own.
using boost::asio::asio_handler_invoke;
asio_handler_invoke(function, &context->handler_);
}

private:
Handler handler_;
std::size_t& count_;
};

} // namespace detail

/// @brief Auxiliary class used to wrap handlers that will count
/// the number of functions invoked in their context.
class operation_counter
{
public:

template <class Handler>
detail::counting_handler<Handler> wrap(Handler handler)
{
return detail::counting_handler<Handler>(handler, count_);
}

std::size_t count() { return count_; }

private:
std::size_t count_ = 0;
};

int main()
{
using boost::asio::ip::tcp;
operation_counter all_operations;

// Create all I/O objects.
boost::asio::io_service io_service;
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
tcp::socket socket1(io_service);
tcp::socket socket2(io_service);

// Connect the sockets.
// operation 1: acceptor.async_accept
acceptor.async_accept(socket1, all_operations.wrap(std::bind(&noop)));
// operation 2: socket2.async_connect
socket2.async_connect(acceptor.local_endpoint(),
all_operations.wrap(std::bind(&noop)));
io_service.run();
io_service.reset();

// socket1 and socket2 are connected. The scenario below will:
// - write bytes to socket1.
// - initiate a composed async_read operaiton to read more bytes
// than are currently available on socket2. This will cause
// the operation to complete with multple async_read_some
// operations on socket2.
// - write more bytes to socket1.

// Write to socket1.
std::string write_buffer = "demo";
boost::asio::write(socket1, boost::asio::buffer(write_buffer));

// Guarantee socket2 has received the data.
assert(socket2.available() == write_buffer.size());

// Initiate a composed operation to more data than is immediately
// available. As some data is available, an intermediate async_read_some
// operation (operation 3) will be executed, and another async_read_some
// operation (operation 4) will eventually be initiated.
std::vector<char> read_buffer(socket2.available() + 1);
operation_counter read_only;
boost::asio::async_read(socket2, boost::asio::buffer(read_buffer),
all_operations.wrap(read_only.wrap(std::bind(&noop))));

// Write more to socket1. This will cause the async_read operation
// to be complete.
boost::asio::write(socket1, boost::asio::buffer(write_buffer));

io_service.run();
std::cout << "total operations: " << all_operations.count() << "\n"
"read operations: " << read_only.count() << std::endl;
}

Output:

total operations: 4
read operations: 2

Composed Handlers

In the above example, the async_read() handler was composed of a handler wrapped twice. First by the operation_counter that is only counting for read operations, and then the resulting functor was wrapped by the operation_counter counting all operations:

boost::asio::async_read(..., all_operations.wrap(read_only.wrap(...)));

The counting_handler's asio_handler_invoke implementation is written to support composition by invoking the Function in the context of the wrapped handler's context. This results in the appropriate counting occurred for each operation_counter:

template <typename Function>
void asio_handler_invoke(
Function function,
counting_handler* context)
{
++context->count_;
// Support chaining custom strategies incase the wrapped handler
// has a custom strategy of its own.
using boost::asio::asio_handler_invoke;
asio_handler_invoke(function, &context->handler_);
}

On the other hand, if the asio_handler_invoke explicitly called function(), then only the outer most wrapper's invocation strategy would be invoked. In this case, it would result in all_operations.count() being 4 and read_only.count() being 0:

template <typename Function>
void asio_handler_invoke(
Function function,
counting_handler* context)
{
++context->count_;
function(); // No chaining.
}

When composing handlers, be aware that the asio_handler_invoke hook that gets invoked is located through argument-dependent lookup, so it is based on the exact handler type. Composing handlers with types that are not asio_handler_invoke aware will prevent the chaining of invocation strategies. For instance, using std::bind() or std::function will result in the default asio_handler_invoke being called, causing custom invocation strategies from being invoked:

// Operations will not be counted.
boost::asio::async_read(..., std::bind(all_operations.wrap(...)));

Proper chaining invocation strategies for composed handlers can be very important. For example, the unspecified handler type returned from strand.wrap() provides the guarantee that initial handler wrapped by the strand and functions invoked in the context of the returned handler will not run concurrently. This allows one to meet the thread-safety requirements of many of the I/O objects when using composed operations, as the strand can be used to synchronize with these intermediate operations to which the application does not have access.

When running the io_service by multiple threads, the below snippet may invoke undefined behavior, as the intermediate operations for both composed operations may run concurrently, as std::bind() will not invoke the appropriate asio_handler_hook:

boost::asio::async_read(socket, ..., std::bind(strand.wrap(&handle_read)));
boost::asio::async_write(socket, ..., std::bind(strand.wrap(&handle_write)));

boost.asio composed operation run in strand

In short, asio_handler_invoke enables one to customize the invocation of handlers in the context of a different handler. In this case, the object returned from strand.wrap() has a custom asio_handler_invoke strategy associated with it that will dispatch handlers into the strand that wrapped the initial handler. Conceptually, it is as follows:

template <typename Handler>
struct strand_handler
{
void operator()();
Handler handler_;
boost::asio::strand dispatcher_;
};

// Customize invocation of Function within context of custom_handler.
template <typename Function>
void asio_handler_invoke(Function function, strand_handler* context)
{
context->dispatcher_.dispatch(function);
}

strand_handler wrapped_completion_handler = strand.wrap(completion_handler);
using boost::asio::asio_handler_invoke;
asio_handler_invoke(intermediate_handler, &wrapped_completion_handler);

The custom asio_handler_invoke hook is located via argument-dependent lookup. This detail is documented in the Handler requirement:

Causes the function object f to be executed as if by calling f().

The asio_handler_invoke() function is located using argument-dependent lookup. The function boost::asio::asio_handler_invoke() serves as a default if no user-supplied function is available.

For more details on asio_handler_invoke, consider reading this answer.


Be aware that an operation may be attempted within the initiating function. The documentation is specific that intermediate handlers will be invoked within the same context as the final completion handler. Therefore, given:

assert(strand.running_in_this_thread());
boost::async_read(socket, buffer, strand.wrap(read_handler));

the boost::async_read itself must be invoked within the context of strand to be thread-safe. See this answer for more details on thread-safety and strands.

Why do I need strand per connection when using boost::asio?

The documentation is correct. With a half duplex protocol implementation, such as HTTP Server 3, the strand is not necessary. The call chains can be illustrated as follows:

void connection::start()
{
socket.async_receive_from(..., &handle_read); ----.
} |
.------------------------------------------------'
| .-----------------------------------------.
V V |
void connection::handle_read(...) |
{ |
if (result) |
boost::asio::async_write(..., &handle_write); ---|--.
else if (!result) | |
boost::asio::async_write(..., &handle_write); --|--|
else | |
socket_.async_read_some(..., &handle_read); ----' |
} |
.---------------------------------------------------'
|
V
void handle_write(...)

As shown in the illustration, only a single asynchronous event is started per path. With no possibility of concurrent execution of the handlers or operations on socket_, it is said to be running in an implicit strand.


Thread Safety

While it does not present itself as an issue in the example, I would like to highlight one important detail of strands and composed operations, such as boost::asio::async_write. Before explaining the details, lets first cover the thread safety model with Boost.Asio. For most Boost.Asio objects, it is safe to have multiple asynchronous operations pending on an object; it is just specified that concurrent calls on the object are unsafe. In the diagrams below, each column represents a thread and each line represents what a thread is doing at a moment in time.

It is safe for a single thread to make sequential calls while other threads make none:

 thread_1                             | thread_2
--------------------------------------+---------------------------------------
socket.async_receive(...); | ...
socket.async_write_some(...); | ...

It is safe for multiple threads to make calls, but not concurrently:

 thread_1                             | thread_2
--------------------------------------+---------------------------------------
socket.async_receive(...); | ...
... | socket.async_write_some(...);

However, it is not safe for multiple threads to make calls concurrently1:

 thread_1                             | thread_2
--------------------------------------+---------------------------------------
socket.async_receive(...); | socket.async_write_some(...);
... | ...

Strands

To prevent concurrent invocations, handlers are often invoked from within strands. This is done by either:

  • Wrapping the handler with strand.wrap. This will return a new handler, that will dispatch through the strand.
  • Posting or dispatching directly through the strand.

Composed operations are unique in that intermediate calls to the stream are invoked within the handler's strand, if one is present, instead of the strand in which the composed operation is initiated. When compared to other operations, this presents an inversion of where the strand is specified. Here is some example code focusing on strand usage, that will demonstrate a socket that is read from via a non-composed operation, and concurrently written to with a composed operation.

void start()
{
// Start read and write chains. If multiple threads have called run on
// the service, then they may be running concurrently. To protect the
// socket, use the strand.
strand_.post(&read);
strand_.post(&write);
}

// read always needs to be posted through the strand because it invokes a
// non-composed operation on the socket.
void read()
{
// async_receive is initiated from within the strand. The handler does
// not affect the strand in which async_receive is executed.
socket_.async_receive(read_buffer_, &handle_read);
}

// This is not running within a strand, as read did not wrap it.
void handle_read()
{
// Need to post read into the strand, otherwise the async_receive would
// not be safe.
strand_.post(&read);
}

// The entry into the write loop needs to be posted through a strand.
// All intermediate handlers and the next iteration of the asynchronous write
// loop will be running in a strand due to the handler being wrapped.
void write()
{
// async_write will make one or more calls to socket_.async_write_some.
// All intermediate handlers (calls after the first), are executed
// within the handler's context (strand_).
boost::asio::async_write(socket_, write_buffer_,
strand_.wrap(&handle_write));
}

// This will be invoked from within the strand, as it was a wrapped
// handler in write().
void handle_write()
{
// handler_write() is invoked within a strand, so write() does not
// have to dispatched through the strand.
write();
}

Importance of Handler Types

Also, within composed operations, Boost.Asio uses argument dependent lookup (ADL) to invoke intermediate handlers through the completion handler's strand. As such, it is important that the completion handler's type has the appropriate asio_handler_invoke() hooks. If type erasure occurs to a type that does not have the appropriate asio_handler_invoke() hooks, such as a case where a boost::function is constructed from the return type of strand.wrap, then intermediate handlers will execute outside of the strand, and only the completion handler will execute within the strand. See this answer for more details.

In the following code, all intermediate handlers and the completion handler will execute within the strand:

boost::asio::async_write(stream, buffer, strand.wrap(&handle_write));

In the following code, only the completion handler will execute within the strand. None of the intermediate handlers will execute within the strand:

boost::function<void()> handler(strand.wrap(&handle_write));
boost::asio::async_write(stream, buffer, handler);

1. The revision history documents an anomaly to this rule. If supported by the OS, synchronous read, write, accept, and connection operations are thread safe. I an including it here for completeness, but suggest using it with caution.

asio::async_write and strand

With the following code:

asio::async_write(stream, ..., custom_alloc(m_strand.wrap(...)));

For this composed operation, all calls to stream.async_write_some() will be invoked within m_strand if all of the following conditions are true:

  • The initiating async_write(...) call is running within m_strand():

    assert(m_strand.running_in_this_thread());
    asio::async_write(stream, ..., custom_alloc(m_strand.wrap(...)));
  • The return type from custom_alloc is either:

    • the exact type returned from strand::wrap()

      template <typename Handler> 
      Handler custom_alloc(Handler) { ... }
    • a custom handler that appropriate chains invocations of asio_handler_invoke():

      template <class Handler>
      class custom_handler
      {
      public:
      custom_handler(Handler handler)
      : handler_(handler)
      {}

      template <class... Args>
      void operator()(Args&&... args)
      {
      handler_(std::forward<Args>(args)...);
      }

      template <typename Function>
      friend void asio_handler_invoke(
      Function intermediate_handler,
      custom_handler* my_handler)
      {
      // Support chaining custom strategies incase the wrapped handler
      // has a custom strategy of its own.
      using boost::asio::asio_handler_invoke;
      asio_handler_invoke(intermediate_handler, &my_handler->handler_);
      }

      private:
      Handler handler_;
      };

      template <typename Handler>
      custom_handler<Handler> custom_alloc(Handler handler)
      {
      return {handler};
      }

See this answer for more details on strands, and this answer for details on asio_handler_invoke.

How strands guarantee correct execution of pending events in boost.asio

strand provides a guarantee for non-concurrency and the invocation order of handlers; strand does not control the order in which operations are executed and demultiplexed. Use a strand if you have either:

  • multiple threads accessing a shared object that is not thread safe
  • a need for a guaranteed sequential ordering of handlers

The io_service will provide the desired and expected ordering of buffers being filled or used in the order in which operations are initiated. For instance, if the socket has "Strawberry fields forever." available to be read, then given:

buffer1.resize(11); // buffer is a std::vector managed elsewhere
buffer2.resize(7); // buffer is a std::vector managed elsewhere
buffer3.resize(8); // buffer is a std::vector managed elsewhere
socket.async_read_some(boost::asio::buffer(buffer1), handler1);
socket.async_read_some(boost::asio::buffer(buffer2), handler2);
socket.async_read_some(boost::asio::buffer(buffer3), handler3);

When the operations complete:

  • handler1 is invoked, buffer1 will contain "Strawberry "
  • handler2 is invoked, buffer2 will contain "fields "
  • handler3 is invoked, buffer3 will contain "forever."

However, the order in which the completion handlers are invoked is unspecified. This unspecified ordering remains true even with a strand.


Operation Demultiplexing

Asio uses the Proactor design pattern[1] to demultiplex operations. On most platforms, this is implemented in terms of a Reactor. The official documentation mentions the components and their responsibilities. Consider the following example:

socket.async_read_some(buffer, handler);

The caller is the initiator, starting an async_read_some asynchronous operation and creating the handler completion handler. The asynchronous operation is executed by the StreamSocketService operation processor:

  • Within the initiating function, if the socket has no other outstanding asynchronous read operations and data is available, then StreamSocketService will read from the socket and enqueue the handler completion handler into the io_service
  • Otherwise, the read operation is queued onto the socket, and the reactor is informed to notify Asio once data becomes available on the socket. When the io_service is ran and data is available on the socket, then the reactor will inform Asio. Next, Asio will dequeue an outstanding read operation from the socket, execute it, and enqueue the handler completion handler into the io_service

The io_service proactor will dequeue a completion handler, demultiplex the handler to threads that are running the io_service, from which the handler completion handler will be executed. The order of invocation of the completion handlers is unspecified.

Multiple Operations

If multiple operations of the same type are initiated on a socket, it is currently unspecified as to the order in which the buffers will be used or filled. However, in the current implementation, each socket uses a FIFO queue for each type of pending operation (e.g. a queue for read operations; a queue for write operations; etc). The networking-ts draft, which is based partially on Asio, specifies:

the buffers are filled in the order in which these operations were issued. The order of invocation of the completion handlers for these operations is unspecified.

Given:

socket.async_read_some(buffer1, handler1); // op1
socket.async_read_some(buffer2, handler2); // op2

As op1 was initiated before op2, then buffer1 is guaranteed to contain data that was received earlier in the stream than the data contained in buffer2, but handler2 may be invoked before handler1.

Composed Operations

Composed operations are composed of zero or more intermediate operations. For example, the async_read() composed asynchronous operation is composed of zero or more intermediate stream.async_read_some() operations.

The current implementation uses operation chaining to create a continuation, where a single async_read_some() operation is initiated, and within its internal completion handle, it determines whether or not to initiate another async_read_some() operation or to invoke the user's completion handler. Because of the continuation, the async_read documentation requires that no other reads occur until the composed operation completes:

The program must ensure that the stream performs no other read operations (such as async_read, the stream's async_read_some function, or any other composed operations that perform reads) until this operation completes.

If a program violates this requirement, one may observe interwoven data, because of the aforementioned order in which buffers are filled.

For a concrete example, consider the case where an async_read() operation is initiated to read 26 bytes of data from a socket:



Related Topics



Leave a reply



Submit