Why Do We Need to Use Boost::Asio::Io_Service::Work

Why do we need to use boost::asio::io_service::work?

When the io_service::run method is called without a work object, it will return right away. Typically, that is not the behavior most developers are looking for. There are of course some exceptions, but most developers are looking to specify a thread to handle all of the asynchronous processing and don't want that thread to exit until told to do so. That is what your code example does.

The io_service::run method is specified as a delegate or function pointer in the create_thread methods. So, when the thread is created from the create_thread method it will call the io_service::run method and it passes the io_service object as an argument. Typically one io_service object can be used with multiple socket objects.

The stop method is usually called when shutting down the application or when communication between all clients/servers is no longer required and it is not anticipated that any new connections will need to be initiated.

What does boost::asio::io_service::run() actually do?

io_service::run processes handlers. Handlers are created by functions which work
in async mode like async_read, async_write. These functions return immediately.
Handler is queued, function returns, io_service::run processes this handler later.
In your case asio::read doesn't create any handler, it is blocking function, so there is no need to call io_service::run.

Why should I use io_service::work?

The io_service::run() will run operations as long as there are asynchronous operations to perform. If, at any time, there are no asynchronous operations pending (or handlers being invoked), the run() call will return.

However, there are some designs that would prefer that the run() call not exit until all work is done AND the io_service has explicitly been instructed that it's okay to exit. That's what io_service::work is used for. By creating the work object (I usually do it on the heap and a shared_ptr), the io_service considers itself to always have something pending, and therefore the run() method will not return. Once I want the service to be able to exit (usually during shutdown), I will destroy the work object.

Why do we need io_service in boost?

Are you reverse-discovering why singletons are bad? That's your answer.

As it is done, you are in control and get to decide how many resources are shared between services in Asio.

Because of that, you can now

  • use Asio in your application even though one of the libraries you link to also uses it
  • use Asio with a service per thread (so there will be no shared state) or with many threads per service

etc.

Boost.Asio: Is it a good thing to use a `io_service` per connection/socket?

No. Using an io_service object per connection is definitely a smell. Especially since you're also running each connection on a dedicated thread.

At this point you have to ask yourself what did asynchrony buy you? You can have all the code synchronous and have exactly the same number of threads etc.

Clearly you want to multiplex the connections onto a far smaller number of services. In practice there are a few sensible models like

  1. a single io_service with a single service thread (this is usually good). No tasks queued on the service may ever block for significant time or the latency will suffer

  2. a single io_service with a number of threads executing handlers. The number of threads in the pool should be enough to service the max. number of simultaneous CPU intensive tasks supported (or again, the latency will start to go up)

  3. an io_service per thread, usually one thread per logical core and with thread affinity so that it "sticks" to that core. This can be ideal for cache locality

UPDATE: Demo

Here's a demo that shows the idiomatic style using option 1. from above:

Live On Coliru

#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>

namespace ba = boost::asio;
namespace bs = boost::system;
namespace b = boost;

typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket socket_type;

const short PORT = 11235;

// A connection has its own io_service and socket
class Connection : public b::enable_shared_from_this<Connection>
{
public:
typedef boost::shared_ptr<Connection> Ptr;
protected:
socket_type sock;
ba::streambuf stream_buffer; // for reading etc
std::string message;

void AsyncReadString() {
std::cout << __PRETTY_FUNCTION__ << "\n";

ba::async_read_until(
sock,
stream_buffer,
'\0', // null-char is a delimiter
b::bind(&Connection::ReadHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred));
}
void AsyncWriteString(const std::string &s) {
std::cout << __PRETTY_FUNCTION__ << "\n";

message = s;

ba::async_write(
sock,
ba::buffer(message.c_str(), message.size()+1),
b::bind(&Connection::WriteHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred));
}
std::string ExtractString() {
std::cout << __PRETTY_FUNCTION__ << "\n";

std::istream is(&stream_buffer);
std::string s;
std::getline(is, s, '\0');
return s;
}
void ReadHandler(
const bs::error_code &ec,
std::size_t bytes_transferred)
{
std::cout << __PRETTY_FUNCTION__ << "\n";

if (!ec) {
std::cout << (ExtractString() + "\n");
std::cout.flush();
AsyncReadString(); // read again
}
else {
// do nothing, "this" will be deleted later
}
}
void WriteHandler(const bs::error_code &ec, std::size_t bytes_transferred) {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
public:
Connection(ba::io_service& svc) : sock(svc) { }

virtual ~Connection() {
std::cout << __PRETTY_FUNCTION__ << "\n";
}

socket_type& Socket() { return sock; }
void Session() { AsyncReadString(); }
void Stop() { sock.cancel(); }
};

// a server also has its own io_service but it's only used for accepting
class Server {
public:
std::list<boost::weak_ptr<Connection> > m_connections;
protected:
ba::io_service _service;
boost::optional<ba::io_service::work> _work;
acceptor_type _acc;
b::thread thread;

void AcceptHandler(const bs::error_code &ec, Connection::Ptr accepted) {
if (!ec) {
accepted->Session();
DoAccept();
}
else {
// do nothing the new session will be deleted automatically by the
// destructor
}
}

void DoAccept() {
auto newaccept = boost::make_shared<Connection>(_service);

_acc.async_accept(
newaccept->Socket(),
b::bind(&Server::AcceptHandler,
this,
ba::placeholders::error,
newaccept
));
}

public:
Server():
_service(),
_work(ba::io_service::work(_service)),
_acc(_service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
thread(b::bind(&ba::io_service::run, &_service))
{ }

~Server() {
std::cout << __PRETTY_FUNCTION__ << "\n";
Stop();
_work.reset();
if (thread.joinable()) thread.join();
}

void Start() {
std::cout << __PRETTY_FUNCTION__ << "\n";
DoAccept();
}

void Stop() {
std::cout << __PRETTY_FUNCTION__ << "\n";
_acc.cancel();
}

void StopAllConnections() {
std::cout << __PRETTY_FUNCTION__ << "\n";
for (auto c : m_connections) {
if (auto p = c.lock())
p->Stop();
}
}
};

int main() {
try {
Server s;
s.Start();

std::cerr << "Shutdown in 2 seconds...\n";
b::this_thread::sleep_for(b::chrono::seconds(2));

std::cerr << "Stop accepting...\n";
s.Stop();

std::cerr << "Shutdown...\n";
s.StopAllConnections(); // interrupt ongoing connections
} // destructor of Server will join the service thread
catch (std::exception &e) {
std::cerr << __FUNCTION__ << ":" << __LINE__ << "\n";
std::cerr << "Exception: " << e.what() << std::endl;
return 1;
}

std::cerr << "Byebye\n";
}

I modified the main() to run for 2 seconds without user intervention. This is so I can demo it Live On Coliru (of course, it's limited w.r.t the number of client processes).

If you run it with a lot (a lot) of clients, using e.g.

$ time (for a in {1..1000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\\0" | netcat localhost 11235)& done; wait)

You will find that the two second window handles them all:

$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
2 hello world 28214
2 hello world 4554
2 hello world 6216
2 hello world 7864
2 hello world 9966
2 void Server::Stop()
1000 std::string Connection::ExtractString()
1001 virtual Connection::~Connection()
2000 void Connection::AsyncReadString()
2000 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

If you really go berserk and raise 1000 to e.g. 100000 there, you'll get things similar to:

sehe@desktop:/tmp$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
2 hello world 5483
2 hello world 579
2 hello world 5865
2 hello world 938
2 void Server::Stop()
3 hello world 9613
1741 std::string Connection::ExtractString()
1742 virtual Connection::~Connection()
3482 void Connection::AsyncReadString()
3482 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

On repeated 2-second runs of the server.

asio::io_service::run doesnt return after boost::asio::io_service::work is destroyed

I cannot reproduce the error on either compiler.
Here example for gcc 9.3 and boost 1.73

Normally the work destructor will use something like InterLockedDecrement on windows to decrement the number of outstanding works.

It looks like some compiler or io_service/work implementation issue.

As stated in comments, io_service and io_service::work are deprecated in terms of io_context and executor_work_guard.

Confused when boost::asio::io_service run method blocks/unblocks

Foundation

Lets start with a simplified example and examine the relevant Boost.Asio pieces:

void handle_async_receive(...) { ... }
void print() { ... }

...

boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket(io_service);

...

io_service.post(&print); // 1
socket.connect(endpoint); // 2
socket.async_receive(buffer, &handle_async_receive); // 3
io_service.post(&print); // 4
io_service.run(); // 5

What Is A Handler?

A handler is nothing more than a callback. In the example code, there are 3 handlers:

  • The print handler (1).
  • The handle_async_receive handler (3).
  • The print handler (4).

Even though the same print() function is used twice, each use is considered to create its own uniquely identifiable handler. Handlers can come in many shapes and sizes, ranging from basic functions like the ones above to more complex constructs such as functors generated from boost::bind() and lambdas. Regardless of the complexity, the handler still remains nothing more than a callback.

What Is Work?

Work is some processing that Boost.Asio has been requested to do on behalf of the application code. Sometimes Boost.Asio may start some of the work as soon as it has been told about it, and other times it may wait to do the work at a later point in time. Once it has finished the work, Boost.Asio will inform the application by invoking the supplied handler.

Boost.Asio guarantees that handlers will only run within a thread that is currently calling run(), run_one(), poll(), or poll_one(). These are the threads that will do work and call handlers. Therefore, in above example, print() is not invoked when it is posted into the io_service (1). Instead, it is added to the io_service and will be invoked at a later point in time. In this case, it within io_service.run() (5).

What Are Asynchronous Operations?

An asynchronous operation creates work and Boost.Asio will invoke a handler to inform the application when the work has completed. Asynchronous operations are created by calling a function that has a name with the prefix async_. These functions are also known as initiating functions.

Asynchronous operations can be decomposed into three unique steps:

  • Initiating, or informing, the associated io_service that works needs to be done. The async_receive operation (3) informs the io_service that it will need to asynchronously read data from the socket, then async_receive returns immediately.
  • Doing the actual work. In this case, when socket receives data, bytes will be read and copied into buffer. The actual work will be done in either:

    • The initiating function (3), if Boost.Asio can determine that it will not block.
    • When the application explicitly run the io_service (5).
  • Invoking the handle_async_receive ReadHandler. Once again, handlers are only invoked within threads running the io_service. Thus, regardless of when the work is done (3 or 5), it is guaranteed that handle_async_receive() will only be invoked within io_service.run() (5).

The separation in time and space between these three steps is known as control flow inversion. It is one of the complexities that makes asynchronous programming difficult. However, there are techniques that can help mitigate this, such as by using coroutines.

What Does io_service.run() Do?

When a thread calls io_service.run(), work and handlers will be invoked from within this thread. In the above example, io_service.run() (5) will block until either:

  • It has invoked and returned from both print handlers, the receive operation completes with success or failure, and its handle_async_receive handler has been invoked and returned.
  • The io_service is explicitly stopped via io_service::stop().
  • An exception is thrown from within a handler.

One potential psuedo-ish flow could be described as the following:


create io_service
create socket
add print handler to io_service (1)
wait for socket to connect (2)
add an asynchronous read work request to the io_service (3)
add print handler to io_service (4)
run the io_service (5)
is there work or handlers?
yes, there is 1 work and 2 handlers
does socket have data? no, do nothing
run print handler (1)
is there work or handlers?
yes, there is 1 work and 1 handler
does socket have data? no, do nothing
run print handler (4)
is there work or handlers?
yes, there is 1 work
does socket have data? no, continue waiting
-- socket receives data --
socket has data, read it into buffer
add handle_async_receive handler to io_service
is there work or handlers?
yes, there is 1 handler
run handle_async_receive handler (3)
is there work or handlers?
no, set io_service as stopped and return

Notice how when the read finished, it added another handler to the io_service. This subtle detail is an important feature of asynchronous programming. It allows for handlers to be chained together. For instance, if handle_async_receive did not get all the data it expected, then its implementation could post another asynchronous read operation, resulting in io_service having more work, and thus not returning from io_service.run().

Do note that when the io_service has ran out of work, the application must reset() the io_service before running it again.


Example Question and Example 3a code

Now, lets examine the two pieces of code referenced in the question.

Question Code

socket->async_receive adds work to the io_service. Thus, io_service->run() will block until the read operation completes with success or error, and ClientReceiveEvent has either finished running or throws an exception.

Example 3a Code

In hopes of making it easier to understand, here is a smaller annotated Example 3a:

void CalculateFib(std::size_t n);

int main()
{
boost::asio::io_service io_service;
boost::optional<boost::asio::io_service::work> work = // '. 1
boost::in_place(boost::ref(io_service)); // .'

boost::thread_group worker_threads; // -.
for(int x = 0; x < 2; ++x) // :
{ // '.
worker_threads.create_thread( // :- 2
boost::bind(&boost::asio::io_service::run, &io_service) // .'
); // :
} // -'

io_service.post(boost::bind(CalculateFib, 3)); // '.
io_service.post(boost::bind(CalculateFib, 4)); // :- 3
io_service.post(boost::bind(CalculateFib, 5)); // .'

work = boost::none; // 4
worker_threads.join_all(); // 5
}

At a high-level, the program will create 2 threads that will process the io_service's event loop (2). This results in a simple thread pool that will calculate Fibonacci numbers (3).

The one major difference between the Question Code and this code is that this code invokes io_service::run() (2) before actual work and handlers are added to the io_service (3). To prevent the io_service::run() from returning immediately, an io_service::work object is created (1). This object prevents the io_service from running out of work; therefore, io_service::run() will not return as a result of no work.

The overall flow is as follows:

  1. Create and add the io_service::work object added to the io_service.
  2. Thread pool created that invokes io_service::run(). These worker threads will not return from io_service because of the io_service::work object.
  3. Add 3 handlers that calculate Fibonacci numbers to the io_service, and return immediately. The worker threads, not the main thread, may start running these handlers immediately.
  4. Delete the io_service::work object.
  5. Wait for worker threads to finish running. This will only occur once all 3 handlers have finished execution, as the io_service neither has handlers nor work.

The code could be written differently, in the same manner as the Original Code, where handlers are added to the io_service, and then the io_service event loop is processed. This removes the need to use io_service::work, and results in the following code:

int main()
{
boost::asio::io_service io_service;

io_service.post(boost::bind(CalculateFib, 3)); // '.
io_service.post(boost::bind(CalculateFib, 4)); // :- 3
io_service.post(boost::bind(CalculateFib, 5)); // .'

boost::thread_group worker_threads; // -.
for(int x = 0; x < 2; ++x) // :
{ // '.
worker_threads.create_thread( // :- 2
boost::bind(&boost::asio::io_service::run, &io_service) // .'
); // :
} // -'
worker_threads.join_all(); // 5
}

Synchronous vs. Asynchronous

Although the code in the question is using an asynchronous operation, it is effectively functioning synchronously, as it is waiting for the asynchronous operation to complete:

socket.async_receive(buffer, handler)
io_service.run();

is equivalent to:

boost::asio::error_code error;
std::size_t bytes_transferred = socket.receive(buffer, 0, error);
handler(error, bytes_transferred);

As a general rule of thumb, try to avoid mixing synchronous and asynchronous operations. Often times, it can turn a complex system into a complicated system. This answer highlights advantages of asynchronous programming, some of which are also covered in the Boost.Asio documentation.

C++ Boost::asio::io_service how can I safe destroy io_service resources when program be finished

C++ is unlike Java or C# - basically any garbage collecting language runtime. It has deterministic destruction. Lifetimes of object are very tangible and reliable.

async_service.~io_service();

This is explicitly invoking a destructor without deleting the object, or before the lifetime of the automatic-storage variable ends.

The consequence is that the language will still invoke the destructor when the lifetime does end.

This is not what you want.

If you need to clear the work, make it a unique_ptr<io_service::work> so you can work_p.reset() instead (which does call its destructor, once).

After that, just wait for the threads to complete io_service::run(), meaning you should thread::join() them before the thread object gets destructed.

Member objects of classes have automatic storage duration and will be destructed when leaving the destructor body. They will be destructed in the reverse order in which they are declared.

Sample

struct MyDemo {
boost::asio::io_service _ios;
std::unique_ptr<boost::asio::io_service::work> _work_p { new boost::asio::io_service::work(_ios) };

std::thread _thread { [&ios] { ios.run(); } };

~MyDemo() {
_work_p.reset();
if (_thread.joinable())
_thread.join();
} // members are destructed by the language
};

Alternative of boost::asio::executor_work_guard for older boost version (1.57)

In older versions you'd use a io_service::work object:

boost::asio::io_service io;
boost::asio::work work(io);

Note that to get reset() like functionality you'd wrap that in boost::optional<> or std::unique_ptr<>

This is actually still in the documentation for the 1.57.0 version in the same place(s) where you'd find executor_work_guard in newer versions, e.g. https://www.boost.org/doc/libs/1_57_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.stopping_the_io_service_from_running_out_of_work



Related Topics



Leave a reply



Submit