How to Design Proper Release of a Boost::Asio Socket or Wrapper Thereof

How to design proper release of a boost::asio socket or wrapper thereof

While others have answered similarly to the second half of this answer, it seems the most complete answer I can find, came from asking the same question on the Boost Mailing list.

http://boost.2283326.n4.nabble.com/How-to-design-proper-release-of-a-boost-asio-socket-or-wrapper-thereof-td4693442.html

I will summarize here in order to assist those that arrive here from a search in the future.

There are 2 options

1) Close the socket in order to cancel any outstanding io and then post a callback for the post-disconnection logic on the io_service and let the server class be called back when the socket has been disconnected. It can then safely release the connection. As long as there was only one thread that had called io_service::run, then other asynchronous operations will have been already been resolved when the callback is made. However, if there are multiple threads that had called io_service::run, then this is not safe.

2) As others have been pointing out in their answers, using the shared_ptr to manage to connections lifetime, using outstanding io operations to keep them alive, is viable. We can use a collection weak_ptr to the connections in order to access them if we need to. The latter is the tidbit that had been omitted from other posts on the topic which confused me.

Reducing complexity of a design using boost::asio

The typical response to that would be to use co-routines.

Boost Asio has two flavours of them:

  • Stackless Coroutines

    These are entirely cooperative, don't allow switching stackes. Instead they employ an ingenious hack with switch statements and a handful macros (yield, reenter, fork).

    A downside to this is that coroutines are functors in this design, and the functor needs to be copyable. This invites choices involving shared_ptrs just in order to make it convenient.

    Shared pointers come with their own performance overhead, which may or may not affect your application.

  • Stackful Coroutines

    These are still cooperative, but they leverage Boost Context (through the Boost Coroutine library) to actually switch stacks. This removes quite a bit of the red-tape involved mentioned before, but introduces another other trade-offs:

    • it might introduce a slight inefficiency; compared to "flat" asynchrony on a single thread, it does introduce context switching, which ironically makes it resemble multi-threading, albeit without the threads
    • it introduces dependency on non-header only libraries Boost Context and Boost Coroutines, which are not supported across all the target platforms that Boost libraries supports

    Stackful coroutines are typically initiated using boost::asio::spawn

I like to think of Stackful Coroutines as a cooperative multi-tasking abstraction that can run inside a full threading eco-system supplied by the OS.

Boost Asio features samples of both styles of Coroutines

  • A single-threaded HTTP server implemented using stackless coroutines
  • A c++11 echo server using spawn
  • The same in c++03 style

Synchronized and concurrent data structure patterns with Boost.Asio

I am looking for a systematic way to apply strand synchronization to:

  • STL (or STL-like) containers (e.g., std::deque, std::unordered_map); and

You're looking for something that doesn't exist. The closest thing is Active Objects, and you hardly need strands for that unless you have asynchronous operations on them. That makes almost zero sense, because no operation on STL containers should have a time complexity enough to warrant asynchrony. The computational complexity on the other hand would be such that adding any kind of synchronization would be very suboptimal -

Instead of fine-grained locking [which you automatic opt for when doing STL data structures as ActiveObjects] you will always find better performance with coarse-grained locking.

Even sooner in the design, you will always have more performance by reducing sharing than by "optimizing synchronization" (those are a contradiction).

  • wait-free containers such as boost::lockfree::spsc_queue or folly::ProducerConsumerQueue.

Why would you even synchronize access to wait-free containers. Wait free implies no synchronization.

The Bullets

  1. To adapt an arbitrary STL container for safe synchronized use, is it sufficient to perform all its operations through a strand instance?

    Yes. Only that's not something that exists. Strands wrap async tasks (and their completion handlers, which are just tasks from the POV of the executor).

    See the rant above.

  2. To adapt a wait-free read-write container for synchronized, concurrent use, is it sufficient to wrap its operations through two distinct strands, one for read operations and one for write operations?

    Like mentioned, it's silly to synchronize access to lock-free constructs.

    This question hints at a "yes", although in that use case the author describes using a strand to coordinate producers from several threads, while presumably only reading from one thread.

    That's specifically related to the SPSC queue, i.e. where additional constraints are placed on threads performing read/write operations.

    While indeed the solution here is to create logical threads of execution with exclusive access to either set of operations, notice that you are constraining the tasks, which is fundamentally different angle from constraining the data.

  3. If the answer to 1-2 above is yes, should the strand just manage operations on the data structure through calls to boost::asio::post?

    So, the answer wasn't "yes". The idea of posting all operations through post would come down to implementing the Active Object pattern, as mentioned in my introduction. Yes, you can do that, and no, that's not gonna be smart. (I'm pretty sure that if you do, by definition you can forget about using lock-free containers)

    [....]

    Then should MyDeque::push_back(const T& t) just call

    boost::asio::post(_strand, [&_deque]{ _deque.push_back(t); })

    Yes, that's the ActiveObject pattern. However, consider how you would implement top(). Consider what you'd do if you had two MyDeque instances (a and `b) and wanted to move items from one to another:

    if (!a.empty()) {
    auto value = a.top(); // synchronizes on the strand to have the return value
    a.pop(); // operation on the strand of a
    b.push(std::move(value)); // operation on the strand of b
    }

    Since queue b is not on the strand of a, b.push() could actually commit before a.pop(), which may not be what you expect. Also, it's is bleedingly obvious that all the fine-grained synchronization steps are going to be far less efficient than having a strand for all operations that work on a set of data structures.

  4. [...] But it seems that [...] that the force of a fully concurrent vector or hash map may be a bit overkill

    There's no "force" in fully concurrent vectors or hash maps. There's a cost to them (in terms of processor business) and a gain (in terms of lower latency). In the cases you mention, latency is rarely the issue (registering a session is an infrequent event, and dominated by actual IO speeds), so you'd be best of using the simplest thing (IMO that would be single-threaded server for those datastructures). Have workers for any non-trivial operations - they could run on a pool of threads. (E.g. if you decide to implement a chess-playing chat-bot)


You want strands to form logical threads of execution. You want to synchronize access to your datastructures, not your datastructures per se. Sometimes lockfree datastructures are a simple choice to avoid having to design things well, but don't expect it magically perform well.

Some links:

  • boost::asio and Active Object (Tanner Sansbury on ActiveObject with Asio) has a lot of thoughts that overlap with your questions
  • How to design proper release of a boost::asio socket or wrapper thereof is my example of maintaining a list of connections

Boost ASIO: Send message to all connected clients

First off: You can broadcast UDP, but that's not to connected clients. That's just... UDP.

Secondly, that link shows how to have a condition-variable (event)-like interface in Asio. That's only a tiny part of your problem. You forgot about the big picture: you need to know about the set of open connections, one way or the other:

  1. e.g. keeping a container of session pointers (weak_ptr) to each connection
  2. each connection subscribing to a signal slot (e.g. Boost Signals).

Option 1. is great for performance, option 2. is better for flexibility (decoupling the event source from subscribers, making it possible to have heterogenous subscribers, e.g. not from connections).

Because I think Option 1. is much simpler w.r.t to threading, better w.r.t. efficiency (you can e.g. serve all clients from one buffer without copying) and you probably don't need to doubly decouple the signal/slots, let me refer to an answer where I already showed as much for pure Asio (without Beast):

  • How to design proper release of a boost::asio socket or wrapper thereof

It shows the concept of a "connection pool" - which is essentially a thread-safe container of weak_ptr<connection> objects with some garbage collection logic.

Demonstration: Introducing Echo Server

After chatting about things I wanted to take the time to actually demonstrate the two approaches, so it's completely clear what I'm talking about.

First let's present a simple, run-of-the mill asynchronous TCP server with

  • with multiple concurrent connections
  • each connected session reads from the client line-by-line, and echoes the same back to the client
  • stops accepting after 3 seconds, and exits after the last client disconnects

master branch on github

#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>

namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;

static bool s_verbose = false;

struct connection : std::enable_shared_from_this<connection> {
connection(ba::io_context& ioc) : _s(ioc) {}

void start() { read_loop(); }
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
if (enqueue(std::move(msg), at_front))
write_loop();
});
}

private:
void do_echo() {
std::string line;
if (getline(std::istream(&_rx), line)) {
send(std::move(line) + '\n');
}
}

bool enqueue(std::string msg, bool at_front)
{ // returns true if need to start write loop
at_front &= !_tx.empty(); // no difference
if (at_front)
_tx.insert(std::next(begin(_tx)), std::move(msg));
else
_tx.push_back(std::move(msg));

return (_tx.size() == 1);
}
bool dequeue()
{ // returns true if more messages pending after dequeue
assert(!_tx.empty());
_tx.pop_front();
return !_tx.empty();
}

void write_loop() {
ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec && dequeue()) write_loop();
});
}

void read_loop() {
ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
do_echo();
if (!ec)
read_loop();
});
}

friend struct server;
ba::streambuf _rx;
std::list<std::string> _tx;
tcp::socket _s;
};

struct server {
server(ba::io_context& ioc) : _ioc(ioc) {
_acc.bind({{}, 6767});
_acc.set_option(tcp::acceptor::reuse_address());
_acc.listen();
accept_loop();
}

void stop() {
_ioc.post([=] {
_acc.cancel();
_acc.close();
});
}

private:
void accept_loop() {
auto session = std::make_shared<connection>(_acc.get_io_context());
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;

session->start();
if (!ec)
accept_loop();
});
}

ba::io_context& _ioc;
tcp::acceptor _acc{_ioc, tcp::v4()};
};

int main(int argc, char** argv) {
s_verbose = argc>1 && argv[1] == "-v"s;

ba::io_context ioc;

server s(ioc);

std::thread th([&ioc] { ioc.run(); }); // todo exception handling

std::this_thread::sleep_for(3s);
s.stop(); // active connections will continue

th.join();
}

Approach 1. Adding Broadcast Messages

So, let's add "broadcast messages" that get sent to all active connections simultaneously. We add two:

  • one at each new connection (saying "Player ## has entered the game")
  • one that emulates a global "server event", like you described in the question). It gets triggered from within main:

    std::this_thread::sleep_for(1s);

    auto n = s.broadcast("random global event broadcast\n");
    std::cout << "Global event broadcast reached " << n << " active connections\n";

Note how we do this by registering a weak pointer to each accepted connection and operating on each:

    _acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;

if (!ec) {
auto n = reg_connection(session);

session->start();
accept_loop();

broadcast("player #" + std::to_string(n) + " has entered the game\n");
}

});

broadcast is also used directly from main and is simply:

size_t broadcast(std::string const& msg) {
return for_each_active([msg](connection& c) { c.send(msg, true); });
}

using-asio-post branch on github

#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>

namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;

static bool s_verbose = false;

struct connection : std::enable_shared_from_this<connection> {
connection(ba::io_context& ioc) : _s(ioc) {}

void start() { read_loop(); }
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
if (enqueue(std::move(msg), at_front))
write_loop();
});
}

private:
void do_echo() {
std::string line;
if (getline(std::istream(&_rx), line)) {
send(std::move(line) + '\n');
}
}

bool enqueue(std::string msg, bool at_front)
{ // returns true if need to start write loop
at_front &= !_tx.empty(); // no difference
if (at_front)
_tx.insert(std::next(begin(_tx)), std::move(msg));
else
_tx.push_back(std::move(msg));

return (_tx.size() == 1);
}
bool dequeue()
{ // returns true if more messages pending after dequeue
assert(!_tx.empty());
_tx.pop_front();
return !_tx.empty();
}

void write_loop() {
ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec && dequeue()) write_loop();
});
}

void read_loop() {
ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
do_echo();
if (!ec)
read_loop();
});
}

friend struct server;
ba::streambuf _rx;
std::list<std::string> _tx;
tcp::socket _s;
};

struct server {
server(ba::io_context& ioc) : _ioc(ioc) {
_acc.bind({{}, 6767});
_acc.set_option(tcp::acceptor::reuse_address());
_acc.listen();
accept_loop();
}

void stop() {
_ioc.post([=] {
_acc.cancel();
_acc.close();
});
}

size_t broadcast(std::string const& msg) {
return for_each_active([msg](connection& c) { c.send(msg, true); });
}

private:
using connptr = std::shared_ptr<connection>;
using weakptr = std::weak_ptr<connection>;

std::mutex _mx;
std::vector<weakptr> _registered;

size_t reg_connection(weakptr wp) {
std::lock_guard<std::mutex> lk(_mx);
_registered.push_back(wp);
return _registered.size();
}

template <typename F>
size_t for_each_active(F f) {
std::vector<connptr> active;
{
std::lock_guard<std::mutex> lk(_mx);
for (auto& w : _registered)
if (auto c = w.lock())
active.push_back(c);
}

for (auto& c : active) {
std::cout << "(running action for " << c->_s.remote_endpoint() << ")" << std::endl;
f(*c);
}

return active.size();
}

void accept_loop() {
auto session = std::make_shared<connection>(_acc.get_io_context());
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;

if (!ec) {
auto n = reg_connection(session);

session->start();
accept_loop();

broadcast("player #" + std::to_string(n) + " has entered the game\n");
}

});
}

ba::io_context& _ioc;
tcp::acceptor _acc{_ioc, tcp::v4()};
};

int main(int argc, char** argv) {
s_verbose = argc>1 && argv[1] == "-v"s;

ba::io_context ioc;

server s(ioc);

std::thread th([&ioc] { ioc.run(); }); // todo exception handling

std::this_thread::sleep_for(1s);

auto n = s.broadcast("random global event broadcast\n");
std::cout << "Global event broadcast reached " << n << " active connections\n";

std::this_thread::sleep_for(2s);
s.stop(); // active connections will continue

th.join();
}

Approach 2: Those Broadcast But With Boost Signals2

The Signals approach is a fine example of Dependency Inversion.

Most salient notes:

  • signal slots get invoked on the thread invoking it ("raising the event")
  • the scoped_connection is there so subscriptions are *automatically removed when the connection is destructed
  • there's subtle difference in the wording of the console message from "reached # active connections" to "reached # active subscribers".

The difference is key to understanding the added flexibility: the signal owner/invoker does not know anything about the subscribers. That's the decoupling/dependency inversion we're talking about

using-signals2 branch on github

#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
#include <boost/signals2.hpp>

namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;

static bool s_verbose = false;

struct connection : std::enable_shared_from_this<connection> {
connection(ba::io_context& ioc) : _s(ioc) {}

void start() { read_loop(); }
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
if (enqueue(std::move(msg), at_front))
write_loop();
});
}

private:
void do_echo() {
std::string line;
if (getline(std::istream(&_rx), line)) {
send(std::move(line) + '\n');
}
}

bool enqueue(std::string msg, bool at_front)
{ // returns true if need to start write loop
at_front &= !_tx.empty(); // no difference
if (at_front)
_tx.insert(std::next(begin(_tx)), std::move(msg));
else
_tx.push_back(std::move(msg));

return (_tx.size() == 1);
}
bool dequeue()
{ // returns true if more messages pending after dequeue
assert(!_tx.empty());
_tx.pop_front();
return !_tx.empty();
}

void write_loop() {
ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec && dequeue()) write_loop();
});
}

void read_loop() {
ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
do_echo();
if (!ec)
read_loop();
});
}

friend struct server;
ba::streambuf _rx;
std::list<std::string> _tx;
tcp::socket _s;

boost::signals2::scoped_connection _subscription;
};

struct server {
server(ba::io_context& ioc) : _ioc(ioc) {
_acc.bind({{}, 6767});
_acc.set_option(tcp::acceptor::reuse_address());
_acc.listen();
accept_loop();
}

void stop() {
_ioc.post([=] {
_acc.cancel();
_acc.close();
});
}

size_t broadcast(std::string const& msg) {
_broadcast_event(msg);
return _broadcast_event.num_slots();
}

private:
boost::signals2::signal<void(std::string const& msg)> _broadcast_event;

size_t reg_connection(connection& c) {
c._subscription = _broadcast_event.connect(
[&c](std::string msg){ c.send(msg, true); }
);

return _broadcast_event.num_slots();
}

void accept_loop() {
auto session = std::make_shared<connection>(_acc.get_io_context());
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;

if (!ec) {
auto n = reg_connection(*session);

session->start();
accept_loop();

broadcast("player #" + std::to_string(n) + " has entered the game\n");
}

});
}

ba::io_context& _ioc;
tcp::acceptor _acc{_ioc, tcp::v4()};
};

int main(int argc, char** argv) {
s_verbose = argc>1 && argv[1] == "-v"s;

ba::io_context ioc;

server s(ioc);

std::thread th([&ioc] { ioc.run(); }); // todo exception handling

std::this_thread::sleep_for(1s);

auto n = s.broadcast("random global event broadcast\n");
std::cout << "Global event broadcast reached " << n << " active subscribers\n";

std::this_thread::sleep_for(2s);
s.stop(); // active connections will continue

th.join();
}

See the diff between Approach 1. and 2.: Compare View on github

A sample of the output when run against 3 concurrent clients with:

(for a in {1..3}; do netcat localhost 6767 < /etc/dictionaries-common/words > echoed.$a& sleep .1; done; time wait)

Sample Image

Segmentation fault on boost::asio::ip::tcp::acceptor

You need to post more code (Server::Server()) and post the files in separate code blocks for readability.

Try changing io to be a pointer, and explicitly initializing it in your constructor:

Irc::Server::Server() : io()
{
start();
startAccept();
}


Related Topics



Leave a reply



Submit