How to Pass a Boost Asio Tcp Socket to a Thread for Sending Heartbeat to Client or Server

How to pass a boost asio tcp socket to a thread for sending heartbeat to client or server

Instead of this:

try
{
boost::asio::ip::tcp::acceptor acceptor(io, ep.protocol());
acceptor.bind(ep);
acceptor.listen();
auto sock = acceptor.accept();
std::thread t([&sock]() {
hearbeatSender(sock);
});
process(sock);
t.join();

}

Use it:

try{
boost::asio::ip::tcp::acceptor acceptor(io, ep.protocol());
acceptor.bind(ep);
acceptor.listen();
boost::asio::ip::tcp::socket sock(io);
acceptor.accept(sock);

std::thread t([&sock]() {
hearbeatSender(sock);
});
process(sock);
t.join();
}

and also include header files:

#include <thread>
#include <chrono>

(Optional) you can also use this_thread::sleep_for instead of sleep()
std::this_thread::sleep_for(std::chrono::seconds(10));

The problem of passing a socket to the thread is solved.

Now, for conversing a HEARTBEAT between a client and a server. Complete code can be checked from here:

Client code HEARTBEAT transfer in every 5 seconds

Server code for giving response to the client

I have a question about C ++ boost :: asio and std :: async

Yes. In fact I have many examples of that on this site.

I wrote one yesterday that starts out with true single-threading:

  • How to pass a boost asio tcp socket to a thread for sending heartbeat to client or server

Note that you can easily also do multi-client servers on a single thread:

  • Boost.Asio: Is it a good thing to use a `io_service` per connection/socket?
  • With a twist (because it wants to synchronize writing to all clients) Thread-safety when accessing data from N-theads in context of an async TCP-server

There must be many more, but these are the first hits I see.

boost::asio::ip::tcp::socket is connected?

TCP is meant to be robust in the face of a harsh network; even though TCP provides what looks like a persistent end-to-end connection, it's all just a lie, each packet is really just a unique, unreliable datagram.

The connections are really just virtual conduits created with a little state tracked at each end of the connection (Source and destination ports and addresses, and local socket). The network stack uses this state to know which process to give each incoming packet to and what state to put in the header of each outgoing packet.

Virtual TCP Conduit

Because of the underlying — inherently connectionless and unreliable — nature of the network, the stack will only report a severed connection when the remote end sends a FIN packet to close the connection, or if it doesn't receive an ACK response to a sent packet (after a timeout and a couple retries).

Because of the asynchronous nature of asio, the easiest way to be notified of a graceful disconnection is to have an outstanding async_read which will return error::eof immediately when the connection is closed. But this alone still leaves the possibility of other issues like half-open connections and network issues going undetected.

The most effectively way to work around unexpected connection interruption is to use some sort of keep-alive or ping. This occasional attempt to transfer data over the connection will allow expedient detection of an unintentionally severed connection.

The TCP protocol actually has a built-in keep-alive mechanism which can be configured in asio using asio::tcp::socket::keep_alive. The nice thing about TCP keep-alive is that it's transparent to the user-mode application, and only the peers interested in keep-alive need configure it. The downside is that you need OS level access/knowledge to configure the timeout parameters, they're unfortunately not exposed via a simple socket option and usually have default timeout values that are quite large (7200 seconds on Linux).

Probably the most common method of keep-alive is to implement it at the application layer, where the application has a special noop or ping message and does nothing but respond when tickled. This method gives you the most flexibility in implementing a keep-alive strategy.

Asio Peer to Peer Network programming

Yes, each process will need a server side (to receive messages from any of the n participants) and one client side (to send messages to any of the n participants). However, as far as I could find in Asio, the only way to send messages to k of the n participants is by creating k threads with k connections

Then you must not have looked in the right place, or not very far at all.

A core tenet async IO is multiplexing IO on a single thread (all of the kqueue/epoll/select/IO completion ports etc abstractions are geared towards that goal).

Here's an absolutely lazy-coded demonstration that shows:

  • single threaded everything
  • a listener that accepts unbounded clients (we could easily add additional listeners)
  • we connect to a collection of "peers"
  • on a heartbeat interval we send all the peers a heartbeat message

        for (auto& peer : peers)
    async_write(peer, buffer(message), [ep=peer.remote_endpoint(ec)](error_code ec, size_t xfr) {
    std::cout << "(sent " << xfr << " bytes to " << ep << "(" << ec.message() << ")" << std::endl;
    });
  • additionally it handles asynchronous process signals (INT, TERM) to shutdown all the async operations

"Live¹" On Coliru

#include <boost/asio.hpp>
#include <list>
#include <iostream>
using std::tuple;
using namespace std::literals;

template <typename T>
static auto reference_eq(T const& obj) {
return [p=&obj](auto& ref) { return &ref == p; };
}

int main() {
using namespace boost::asio; // don't be this lazy please
using boost::system::error_code;
using ip::tcp;

io_context ioc;
tcp::acceptor listener(ioc, {{}, 6868});
listener.set_option(tcp::acceptor::reuse_address(true));
listener.listen();

using Loop = std::function<void()>;

std::list<tcp::socket> clients, peers;

// accept unbounded clients
Loop accept_loop = [&] {
listener.async_accept([&](error_code const& ec, tcp::socket s) {
if (!ec) {
std::cout << "New session " << s.remote_endpoint() << std::endl;
clients.push_back(std::move(s));
accept_loop();
}
});
};

tcp::resolver resoler(ioc);
for (auto [host,service] : {
tuple{"www.example.com", "http"},
{"localhost", "6868"},
{"::1", "6868"},
// ...
})
{
auto& p = peers.emplace_back(ioc);
async_connect(p, resoler.resolve(host,service), [&,spec=(host+":"s+service)](error_code ec, auto...) {
std::cout << "For " << spec << " (" << ec.message() << ")";
if (!ec)
std::cout << " " << p.remote_endpoint();
else
peers.remove_if(reference_eq(p));
std::cout << std::endl;
});
}

std::string const& message = "heartbeat\n";
high_resolution_timer timer(ioc);
Loop heartbeat = [&]() mutable {
timer.expires_from_now(2s);
timer.async_wait([&](error_code ec) {
std::cout << "heartbeat " << ec.message() << std::endl;
if (ec)
return;
for (auto& peer : peers)
async_write(peer, buffer(message), [ep=peer.remote_endpoint(ec)](error_code ec, size_t xfr) {
std::cout << "(sent " << xfr << " bytes to " << ep << "(" << ec.message() << ")" << std::endl;
});
heartbeat();
});
};

signal_set sigs(ioc, SIGINT, SIGTERM);
sigs.async_wait([&](error_code ec, int sig) {
if (!ec) {
std::cout << "signal: " << strsignal(sig) << std::endl;
listener.cancel();
timer.cancel();
} });

accept_loop();
heartbeat();

ioc.run_for(10s); // max time for Coliru, or just `run()`
}

Prints (on my system):

New session 127.0.0.1:46730
For localhost:6868 (Success) 127.0.0.1:6868
For ::1:6868 (Connection refused)
For www.example.com:http (Success) 93.184.216.34:80
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
^Csignal: Interrupt
heartbeat Operation canceled

Note how the one client ("New session") is our own peer connection on localhost:6868 :)

Of course, in real life you would have a class to represent a client session, perhaps have queues for messages pending sending, and optionally run on multiple threads (using strands to synchronize access to shared objects).

OTHER SAMPLES

If you really wish to avoid an explicit collection of clients, see this very similar demo: How to pass a boost asio tcp socket to a thread for sending heartbeat to client or server which

  • also starts from single-threaded, but adds a thread pool for strand demonstration purposes)
  • It has a heartbeat timer per session meaning that each session can have their own frequency

¹ it's not working on coliru because of limited access to network. A loop-back only version without resolver use works: Live On Coliru

boost::asio sync server is not accepting connections after first one

The general pattern for an asio-based listener is:

// This only happens once!
create an asio_service
create a socket into which a new connection will be accepted
call asio_service->async_accept passing
the accept socket and
a handler (function object) [ see below]
start new threads (if desired. you can use the main thread if it
has nothing else to do)
Each thread should:
call asio_service->run [or any of the variations -- run_one, poll, etc]

Unless the main thread called asio_service->run() it ends up here
"immediately" It should do something to pass the time (like read
from the console or...) If it doesn't have anything to do, it probably
should have called run() to make itself available in the asio's thread pool.

In the handler function:

  Do something with the socket that is now connected.
create a new socket for the next accept
call asio_service->async_accept passing
the new accept socket and
the same handler.

Notice in particular that each accept call only accepts one connection, and you should not have more than one accept at a time listening on the same port, so you need to call async_accept again in the handler from the previous call.

Boost ASIO has some very good tutorial examples like this one

How to safely write to a socket from multiple threads?

You only have 1 thread running the IO service. Everything is on an implicit strand (Why do I need strand per connection when using boost::asio?), no need to worry UNTIL you start using a new thread.

The simplest fix, then, would seem to make sure sending the replies happens on the IO service as well:

void process_message(std::string const& message) {
std::string response = handler.processMessage(message);
post(socket_.get_executor(),
std::bind(&session::do_write, shared_from_this(), response));
}

Now if you wanted to be able to run the IO services on multiple threads, you would just make sure that the socket uses a strand executor.

HOWEVER

This doesn't guarantee that you won't see overlapping async_write operations, because the speed at which incoming messages are processed might be higher than the speed in which they are sent. Therefore the customary solution is

Queueing

In my examples I typically call this FIFO queue "outbox_" and I prefer to use deque for reasons of iterator/reference stability (see Iterator invalidation rules for C++ containers):

void do_write(std::string message)
{
outbox_.push_back(std::move(message)); // assumed on (implicit) strand
if (outbox_.size() == 1) {
write_loop();
}
}

void write_loop() {
if (outbox_.empty())
return;

asio::async_write( //
socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()] //
(error_code ec, std::size_t /*length*/) {
if (!ec) {
outbox_.pop_front();
write_loop();
} else if (ec != asio::error::eof) {
std::cerr << "Write error: " << ec.message() << '\n';
}
});
}

Demo

Here's a fixed listing with a stub message.h.

It also greatly simplifies the reading/buffer handling by using the existing async_read_until composed operation, which does everything you had manually written.

Live On Coliru

#include <boost/asio.hpp>
#include <cstdlib>
#include <deque>
#include <iostream>
#include <thread>
#include <utility>
#if 0
#include "messages.h"
#else // mock messages.h
#include <boost/lexical_cast.hpp>
#include <iomanip>
struct MessageHandler {
std::string initialMessage() const { return "Initial\n"; }
std::string processMessage(std::string const& req) const {
return "Processed " +
boost::lexical_cast<std::string>(std::quoted(req)) + "\n";
}
};
#endif

namespace asio = boost::asio;
using boost::system::error_code;
using asio::ip::tcp;

class session : public std::enable_shared_from_this<session> {
public:
session(tcp::socket socket) : socket_(std::move(socket)) {}

void start() {
handler = MessageHandler();
asio::write(socket_, asio::buffer(handler.initialMessage()));
do_read();
}

private:
void do_read() {
async_read_until(
socket_, asio::dynamic_buffer(buffer_), '\0',
[this, self = shared_from_this()] //
(error_code ec, std::size_t length) {
if (!ec) {
std::thread(&session::process_message, this, buffer_.substr(0, length - 1)).detach();
buffer_.erase(0, length);

do_read();
} else if (ec != asio::error::eof) {
std::cerr << "Read error: " << ec.message() << std::endl;
}
});
}

void do_write(std::string message)
{
outbox_.push_back(std::move(message)); // assumed on (implicit) strand
if (outbox_.size() == 1) {
write_loop();
}
}

void write_loop() {
if (outbox_.empty())
return;

asio::async_write( //
socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()] //
(error_code ec, std::size_t /*length*/) {
if (!ec) {
outbox_.pop_front();
write_loop();
} else if (ec != asio::error::eof) {
std::cerr << "Write error: " << ec.message() << std::endl;
}
});
}

void process_message(std::string const& message) {
std::string response = handler.processMessage(message);
// dispatch/post to executor because we are on a different thread
post(socket_.get_executor(),
std::bind(&session::do_write, shared_from_this(), response));
}

tcp::socket socket_;
std::string buffer_;
std::deque<std::string> outbox_;
MessageHandler handler;
};

class server
{
public:
server(asio::io_context& io_context, unsigned short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
socket_(io_context)
{
do_accept();
}

private:
void do_accept()
{
acceptor_.async_accept(socket_, [this](error_code ec) {
if (!ec) {
std::cout << "Accepted " << socket_.remote_endpoint() << std::endl;
std::make_shared<session>(std::move(socket_))->start();
}

do_accept();
});
}

tcp::acceptor acceptor_;
tcp::socket socket_;
};

void serverInit() {
try {
asio::io_context io_context;

server s(io_context, 8989);

io_context.run();
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
}

int main() { serverInit(); }

When sending a last burst of requests:

printf 'Message%d\0' {1..100} | nc 127.0.0.1 8989 -w1

Prints correctly e.g.:

Accepted 127.0.0.1:34862

And the client receivese e.g.:

Initial
Processed "Message2"
Processed "Message1"
Processed "Message4"
Processed "Message3"
Processed "Message5"
Processed "Message6"
Processed "Message7"
Processed "Message8"
Processed "Message9"
Processed "Message10"
Processed "Message11"
Processed "Message12"
Processed "Message13"
Processed "Message15"
Processed "Message16"
Processed "Message14"
Processed "Message18"
Processed "Message19"
Processed "Message20"
Processed "Message21"
Processed "Message22"
Processed "Message23"
Processed "Message24"
Processed "Message25"
Processed "Message26"
Processed "Message27"
Processed "Message28"
Processed "Message29"
Processed "Message30"
Processed "Message31"
Processed "Message32"
Processed "Message33"
Processed "Message34"
Processed "Message35"
Processed "Message17"
Processed "Message36"
Processed "Message38"
Processed "Message39"
Processed "Message40"
Processed "Message41"
Processed "Message42"
Processed "Message43"
Processed "Message44"
Processed "Message45"
Processed "Message46"
Processed "Message47"
Processed "Message48"
Processed "Message49"
Processed "Message50"
Processed "Message51"
Processed "Message52"
Processed "Message53"
Processed "Message54"
Processed "Message55"
Processed "Message56"
Processed "Message57"
Processed "Message58"
Processed "Message59"
Processed "Message60"
Processed "Message61"
Processed "Message62"
Processed "Message63"
Processed "Message64"
Processed "Message65"
Processed "Message66"
Processed "Message67"
Processed "Message68"
Processed "Message69"
Processed "Message70"
Processed "Message71"
Processed "Message72"
Processed "Message73"
Processed "Message74"
Processed "Message75"
Processed "Message76"
Processed "Message77"
Processed "Message78"
Processed "Message79"
Processed "Message80"
Processed "Message81"
Processed "Message82"
Processed "Message83"
Processed "Message84"
Processed "Message85"
Processed "Message86"
Processed "Message87"
Processed "Message88"
Processed "Message89"
Processed "Message90"
Processed "Message91"
Processed "Message92"
Processed "Message93"
Processed "Message94"
Processed "Message95"
Processed "Message96"
Processed "Message97"
Processed "Message98"
Processed "Message99"
Processed "Message100"
Processed "Message37"

BONUS: Adding the strand

Minimal changes:

class server
{
public:
server(asio::any_io_executor ex, unsigned short port)
: acceptor_(ex, tcp::endpoint(tcp::v4(), port)) {
do_accept();
}

private:
void do_accept()
{
acceptor_.async_accept(make_strand(acceptor_.get_executor()), [this](error_code ec, tcp::socket&& s) {
if (!ec) {
std::cout << "Accepted " << s.remote_endpoint() << std::endl;
std::make_shared<session>(std::move(s))->start();
}

do_accept();
});
}

tcp::acceptor acceptor_;
};

void serverInit() {
try {
asio::thread_pool io_context;

server s(io_context.get_executor(), 8989);

io_context.join();
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
}

Live demo:

enter image description here

Why in asio's example the tcp acceptor pattern uses shared_pointer model wrapping heap socket, while udp use stack socket?

The shared pointer is there to manage the lifetime of the connection.

In the case of TCP it is more common to have multiple connections which leads to a situation where you will likely have multiple connections instances with unrelated lifetimes.

UDP is connection-less and many times is used for one-shot messages.

In fact you can come up with scenarios where you'd use shared pointers with UDP (e.g. with "logical connections", such as streaming audio over UDP).

Also, conversely you CAN solve the lifetime puzzle differently (regardless of TCP/UDP). For example here I used a std::list (for reference stability) judicious single-threaded access to it: How to pass a boost asio tcp socket to a thread for sending heartbeat to client or server ¹


¹ I previously compared that to a shared_ptr approach in this answer: How to eliminate crashes when destroying boost::asio entities on fly?, which might interest you as well

boost::asio::write does not seem to work while boost::asio::read is outstanding

Asio socket is not thread-safe, so you may not access it from different threads.
Use async_read and async_write instead.

How to eliminate crashes when destroying boost::asio entities on fly?

Elementary, my dear Watson

The key of problem – I am very trusting person

I should mention that I use non-boost Asio ver. 1.18.0, with VS2017 and Win10. Thus, all explanation below has relation to windows’ part of Asio. With some probability, the posix implementation works a little bit different.

The main idea of the initial implementation was: - to have the ability to control the population of server/connection objects just by adding/removing them from an appropriate set<> collection.

The text below describes why it does not work without an additional effort.

According to the Asio documentation:

~basic_stream_socket(); This function destroys the socket, cancelling
any outstanding asynchronous operations associated with the socket as
if by calling cancel.

My mistake was to think that cancelling of asynchronous operations will be performed in scope of destructor with calling async handlers also.

Its funny, I thought why do they use that self pointers inside async handlers, if the async handlers should be rejected during the object’s destruction stage. The right answer – the async handlers will not be rejected )).

In fact, async handlers will be called afterwards, the class entity will be already destroyed by that time.

What is happening:

  1. When destroying server or connection class: WinSock2 ::closesocket() is called for socket handle in ~basic_stream_socket().
  2. On the next iteration inside the iocontext.run(): win_iocp_io_context::do_one() calls ::GetQueuedCompletionStatus() to get async operation result and to start the async handler which was associated with destroyed socket.

There are two scenarios which are interesting for us:

  1. Socket waits data.
  2. Socket is destroying (e.g. inside the connection class destructor).
  3. Async handler with error is called.

In this scenario we may check the error code and close async handler even if class was already destroyed. Bad, but worked solution I demonstrated in the code at my question.

  1. Socket gets some data. Async handler was not started yet.
  2. Socket is destroying (e.g. inside the connection class destructor).
  3. Async handler is started WITHOUT ERRORS!!! Disaster.

In this scenario the error code can’t save us. The crash happens.
Thus, the approach of checking the error codes inside the async handlers is not working.

The code below solves all problems by introducing hasta_la_vista() method for server and connection classes. Not super elegant but reinforced concrete solution:

#include <map>
#include <array>
#include <set>
#include <vector>
#include <deque>
#include <thread>
#include <iostream>
#include <asio.hpp>
#include <iomanip>

class CAsioConnection
: public std::enable_shared_from_this<CAsioConnection>
{
public:
using PtrType = std::shared_ptr<CAsioConnection>;

CAsioConnection(asio::ip::tcp::socket socket, std::set<CAsioConnection::PtrType>& connections)
: socket_(std::move(socket)), connections_(connections), destroying_in_progress(false)
{
std::cout << "-- CAsioConnection is creating\n";
}

virtual ~CAsioConnection()
{
std::cout << "-- CAsioConnection is destroying\n";
}

void read() { do_read(); }

void hasta_la_vista(void)
{
destroying_in_progress = true;
std::error_code ec;
socket_.cancel(ec);
}

private:
void do_read(void)
{
auto self(shared_from_this());
asio::async_read(socket_, asio::buffer(buff),
[this, self](std::error_code ec, std::size_t /*length*/) {

if (destroying_in_progress)
return;

if (!ec)
{
do_read();
}
else
{
std::cout << "-- CAsioConnection::do_read() error : (" << ec.value() << ") " << ec.message() << "\n";
hasta_la_vista();
connections_.erase(shared_from_this());
}
});
}

uint8_t buff[3];
asio::ip::tcp::socket socket_;
bool destroying_in_progress;
std::set<CAsioConnection::PtrType>& connections_;
};

//*****************************************************************************

class CAsioServer
: public std::enable_shared_from_this<CAsioServer>
{
public:
using PtrType = std::shared_ptr<CAsioServer>;

CAsioServer(int port, asio::io_context& io, const asio::ip::tcp::endpoint& endpoint)
: port_(port), destroying_in_progress(false), acceptor_(io, endpoint)
{
std::cout << "-- CAsioServer is creating, port: " << port_ << "\n";
}

virtual ~CAsioServer()
{
for (auto c : connections_)
{
c->hasta_la_vista();
}

std::cout << "-- CAsioServer is destroying , port: " << port_ << "\n";
}

int port(void) { return port_; }

void accept(void) { do_accept(); }
void hasta_la_vista(void)
{
destroying_in_progress = true;
std::error_code ec;
acceptor_.cancel(ec);
}
private:
void do_accept()
{
auto self(shared_from_this());
acceptor_.async_accept([this, self](std::error_code ec, asio::ip::tcp::socket socket) {

if (destroying_in_progress)
return;

if (!ec)
{
std::cout << "-- CAsioServer::do_accept() connection to socket: " << socket.native_handle() << "\n";
auto c = std::make_shared<CAsioConnection>(std::move(socket), connections_);
connections_.insert(c);
c->read();
}
else
{
std::cout << "-- CAsioServer::do_accept() error : (" << ec.value() << ") "<< ec.message() << "\n";
}
do_accept();
});
}

int port_;
bool destroying_in_progress;
asio::ip::tcp::acceptor acceptor_;
std::set<CAsioConnection::PtrType> connections_;
};


Related Topics



Leave a reply



Submit