How to design proper release of a boost::asio socket or wrapper thereof
While others have answered similarly to the second half of this answer, it seems the most complete answer I can find, came from asking the same question on the Boost Mailing list.
http://boost.2283326.n4.nabble.com/How-to-design-proper-release-of-a-boost-asio-socket-or-wrapper-thereof-td4693442.html
I will summarize here in order to assist those that arrive here from a search in the future.
There are 2 options
1) Close the socket in order to cancel any outstanding io and then post a callback for the post-disconnection logic on the io_service and let the server class be called back when the socket has been disconnected. It can then safely release the connection. As long as there was only one thread that had called io_service::run, then other asynchronous operations will have been already been resolved when the callback is made. However, if there are multiple threads that had called io_service::run, then this is not safe.
2) As others have been pointing out in their answers, using the shared_ptr to manage to connections lifetime, using outstanding io operations to keep them alive, is viable. We can use a collection weak_ptr to the connections in order to access them if we need to. The latter is the tidbit that had been omitted from other posts on the topic which confused me.
Reducing complexity of a design using boost::asio
The typical response to that would be to use co-routines.
Boost Asio has two flavours of them:
Stackless Coroutines
These are entirely cooperative, don't allow switching stackes. Instead they employ an ingenious hack with
switch
statements and a handful macros (yield
,reenter
,fork
).A downside to this is that coroutines are functors in this design, and the functor needs to be copyable. This invites choices involving
shared_ptr
s just in order to make it convenient.Shared pointers come with their own performance overhead, which may or may not affect your application.
Stackful Coroutines
These are still cooperative, but they leverage Boost Context (through the Boost Coroutine library) to actually switch stacks. This removes quite a bit of the red-tape involved mentioned before, but introduces another other trade-offs:
- it might introduce a slight inefficiency; compared to "flat" asynchrony on a single thread, it does introduce context switching, which ironically makes it resemble multi-threading, albeit without the threads
- it introduces dependency on non-header only libraries Boost Context and Boost Coroutines, which are not supported across all the target platforms that Boost libraries supports
Stackful coroutines are typically initiated using
boost::asio::spawn
I like to think of Stackful Coroutines as a cooperative multi-tasking abstraction that can run inside a full threading eco-system supplied by the OS.
Boost Asio features samples of both styles of Coroutines
- A single-threaded HTTP server implemented using stackless coroutines
- A c++11 echo server using
spawn
- The same in c++03 style
Synchronized and concurrent data structure patterns with Boost.Asio
I am looking for a systematic way to apply strand synchronization to:
- STL (or STL-like) containers (e.g., std::deque, std::unordered_map); and
You're looking for something that doesn't exist. The closest thing is Active Objects, and you hardly need strands for that unless you have asynchronous operations on them. That makes almost zero sense, because no operation on STL containers should have a time complexity enough to warrant asynchrony. The computational complexity on the other hand would be such that adding any kind of synchronization would be very suboptimal -
Instead of fine-grained locking [which you automatic opt for when doing STL data structures as ActiveObjects] you will always find better performance with coarse-grained locking.
Even sooner in the design, you will always have more performance by reducing sharing than by "optimizing synchronization" (those are a contradiction).
- wait-free containers such as boost::lockfree::spsc_queue or folly::ProducerConsumerQueue.
Why would you even synchronize access to wait-free containers. Wait free implies no synchronization.
The Bullets
To adapt an arbitrary STL container for safe synchronized use, is it sufficient to perform all its operations through a strand instance?
Yes. Only that's not something that exists. Strands wrap async tasks (and their completion handlers, which are just tasks from the POV of the executor).
See the rant above.
To adapt a wait-free read-write container for synchronized, concurrent use, is it sufficient to wrap its operations through two distinct strands, one for read operations and one for write operations?
Like mentioned, it's silly to synchronize access to lock-free constructs.
This question hints at a "yes", although in that use case the author describes using a strand to coordinate producers from several threads, while presumably only reading from one thread.
That's specifically related to the SPSC queue, i.e. where additional constraints are placed on threads performing read/write operations.
While indeed the solution here is to create logical threads of execution with exclusive access to either set of operations, notice that you are constraining the tasks, which is fundamentally different angle from constraining the data.
If the answer to 1-2 above is yes, should the strand just manage operations on the data structure through calls to boost::asio::post?
So, the answer wasn't "yes". The idea of posting all operations through
post
would come down to implementing the Active Object pattern, as mentioned in my introduction. Yes, you can do that, and no, that's not gonna be smart. (I'm pretty sure that if you do, by definition you can forget about using lock-free containers)[....]
Then should MyDeque::push_back(const T& t) just callboost::asio::post(_strand, [&_deque]{ _deque.push_back(t); })
Yes, that's the ActiveObject pattern. However, consider how you would implement
top()
. Consider what you'd do if you had twoMyDeque
instances (a
and `b) and wanted to move items from one to another:if (!a.empty()) {
auto value = a.top(); // synchronizes on the strand to have the return value
a.pop(); // operation on the strand of a
b.push(std::move(value)); // operation on the strand of b
}Since queue
b
is not on the strand ofa
,b.push()
could actually commit beforea.pop()
, which may not be what you expect. Also, it's is bleedingly obvious that all the fine-grained synchronization steps are going to be far less efficient than having a strand for all operations that work on a set of data structures.[...] But it seems that [...] that the force of a fully concurrent vector or hash map may be a bit overkill
There's no "force" in fully concurrent vectors or hash maps. There's a cost to them (in terms of processor business) and a gain (in terms of lower latency). In the cases you mention, latency is rarely the issue (registering a session is an infrequent event, and dominated by actual IO speeds), so you'd be best of using the simplest thing (IMO that would be single-threaded server for those datastructures). Have workers for any non-trivial operations - they could run on a pool of threads. (E.g. if you decide to implement a chess-playing chat-bot)
You want strands to form logical threads of execution. You want to synchronize access to your datastructures, not your datastructures per se. Sometimes lockfree datastructures are a simple choice to avoid having to design things well, but don't expect it magically perform well.
Some links:
- boost::asio and Active Object (Tanner Sansbury on ActiveObject with Asio) has a lot of thoughts that overlap with your questions
- How to design proper release of a boost::asio socket or wrapper thereof is my example of maintaining a list of connections
Boost ASIO: Send message to all connected clients
First off: You can broadcast UDP, but that's not to connected clients. That's just... UDP.
Secondly, that link shows how to have a condition-variable (event)-like interface in Asio. That's only a tiny part of your problem. You forgot about the big picture: you need to know about the set of open connections, one way or the other:
- e.g. keeping a container of session pointers (
weak_ptr
) to each connection - each connection subscribing to a signal slot (e.g. Boost Signals).
Option 1. is great for performance, option 2. is better for flexibility (decoupling the event source from subscribers, making it possible to have heterogenous subscribers, e.g. not from connections).
Because I think Option 1. is much simpler w.r.t to threading, better w.r.t. efficiency (you can e.g. serve all clients from one buffer without copying) and you probably don't need to doubly decouple the signal/slots, let me refer to an answer where I already showed as much for pure Asio (without Beast):
- How to design proper release of a boost::asio socket or wrapper thereof
It shows the concept of a "connection pool" - which is essentially a thread-safe container of weak_ptr<connection>
objects with some garbage collection logic.
Demonstration: Introducing Echo Server
After chatting about things I wanted to take the time to actually demonstrate the two approaches, so it's completely clear what I'm talking about.
First let's present a simple, run-of-the mill asynchronous TCP server with
- with multiple concurrent connections
- each connected session reads from the client line-by-line, and echoes the same back to the client
- stops accepting after 3 seconds, and exits after the last client disconnects
master branch on github
#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;
static bool s_verbose = false;
struct connection : std::enable_shared_from_this<connection> {
connection(ba::io_context& ioc) : _s(ioc) {}
void start() { read_loop(); }
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
if (enqueue(std::move(msg), at_front))
write_loop();
});
}
private:
void do_echo() {
std::string line;
if (getline(std::istream(&_rx), line)) {
send(std::move(line) + '\n');
}
}
bool enqueue(std::string msg, bool at_front)
{ // returns true if need to start write loop
at_front &= !_tx.empty(); // no difference
if (at_front)
_tx.insert(std::next(begin(_tx)), std::move(msg));
else
_tx.push_back(std::move(msg));
return (_tx.size() == 1);
}
bool dequeue()
{ // returns true if more messages pending after dequeue
assert(!_tx.empty());
_tx.pop_front();
return !_tx.empty();
}
void write_loop() {
ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec && dequeue()) write_loop();
});
}
void read_loop() {
ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
do_echo();
if (!ec)
read_loop();
});
}
friend struct server;
ba::streambuf _rx;
std::list<std::string> _tx;
tcp::socket _s;
};
struct server {
server(ba::io_context& ioc) : _ioc(ioc) {
_acc.bind({{}, 6767});
_acc.set_option(tcp::acceptor::reuse_address());
_acc.listen();
accept_loop();
}
void stop() {
_ioc.post([=] {
_acc.cancel();
_acc.close();
});
}
private:
void accept_loop() {
auto session = std::make_shared<connection>(_acc.get_io_context());
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
session->start();
if (!ec)
accept_loop();
});
}
ba::io_context& _ioc;
tcp::acceptor _acc{_ioc, tcp::v4()};
};
int main(int argc, char** argv) {
s_verbose = argc>1 && argv[1] == "-v"s;
ba::io_context ioc;
server s(ioc);
std::thread th([&ioc] { ioc.run(); }); // todo exception handling
std::this_thread::sleep_for(3s);
s.stop(); // active connections will continue
th.join();
}
Approach 1. Adding Broadcast Messages
So, let's add "broadcast messages" that get sent to all active connections simultaneously. We add two:
- one at each new connection (saying "Player ## has entered the game")
one that emulates a global "server event", like you described in the question). It gets triggered from within main:
std::this_thread::sleep_for(1s);
auto n = s.broadcast("random global event broadcast\n");
std::cout << "Global event broadcast reached " << n << " active connections\n";
Note how we do this by registering a weak pointer to each accepted connection and operating on each:
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
if (!ec) {
auto n = reg_connection(session);
session->start();
accept_loop();
broadcast("player #" + std::to_string(n) + " has entered the game\n");
}
});
broadcast
is also used directly from main
and is simply:
size_t broadcast(std::string const& msg) {
return for_each_active([msg](connection& c) { c.send(msg, true); });
}
using-asio-post
branch on github
#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;
static bool s_verbose = false;
struct connection : std::enable_shared_from_this<connection> {
connection(ba::io_context& ioc) : _s(ioc) {}
void start() { read_loop(); }
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
if (enqueue(std::move(msg), at_front))
write_loop();
});
}
private:
void do_echo() {
std::string line;
if (getline(std::istream(&_rx), line)) {
send(std::move(line) + '\n');
}
}
bool enqueue(std::string msg, bool at_front)
{ // returns true if need to start write loop
at_front &= !_tx.empty(); // no difference
if (at_front)
_tx.insert(std::next(begin(_tx)), std::move(msg));
else
_tx.push_back(std::move(msg));
return (_tx.size() == 1);
}
bool dequeue()
{ // returns true if more messages pending after dequeue
assert(!_tx.empty());
_tx.pop_front();
return !_tx.empty();
}
void write_loop() {
ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec && dequeue()) write_loop();
});
}
void read_loop() {
ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
do_echo();
if (!ec)
read_loop();
});
}
friend struct server;
ba::streambuf _rx;
std::list<std::string> _tx;
tcp::socket _s;
};
struct server {
server(ba::io_context& ioc) : _ioc(ioc) {
_acc.bind({{}, 6767});
_acc.set_option(tcp::acceptor::reuse_address());
_acc.listen();
accept_loop();
}
void stop() {
_ioc.post([=] {
_acc.cancel();
_acc.close();
});
}
size_t broadcast(std::string const& msg) {
return for_each_active([msg](connection& c) { c.send(msg, true); });
}
private:
using connptr = std::shared_ptr<connection>;
using weakptr = std::weak_ptr<connection>;
std::mutex _mx;
std::vector<weakptr> _registered;
size_t reg_connection(weakptr wp) {
std::lock_guard<std::mutex> lk(_mx);
_registered.push_back(wp);
return _registered.size();
}
template <typename F>
size_t for_each_active(F f) {
std::vector<connptr> active;
{
std::lock_guard<std::mutex> lk(_mx);
for (auto& w : _registered)
if (auto c = w.lock())
active.push_back(c);
}
for (auto& c : active) {
std::cout << "(running action for " << c->_s.remote_endpoint() << ")" << std::endl;
f(*c);
}
return active.size();
}
void accept_loop() {
auto session = std::make_shared<connection>(_acc.get_io_context());
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
if (!ec) {
auto n = reg_connection(session);
session->start();
accept_loop();
broadcast("player #" + std::to_string(n) + " has entered the game\n");
}
});
}
ba::io_context& _ioc;
tcp::acceptor _acc{_ioc, tcp::v4()};
};
int main(int argc, char** argv) {
s_verbose = argc>1 && argv[1] == "-v"s;
ba::io_context ioc;
server s(ioc);
std::thread th([&ioc] { ioc.run(); }); // todo exception handling
std::this_thread::sleep_for(1s);
auto n = s.broadcast("random global event broadcast\n");
std::cout << "Global event broadcast reached " << n << " active connections\n";
std::this_thread::sleep_for(2s);
s.stop(); // active connections will continue
th.join();
}
Approach 2: Those Broadcast But With Boost Signals2
The Signals approach is a fine example of Dependency Inversion.
Most salient notes:
- signal slots get invoked on the thread invoking it ("raising the event")
- the
scoped_connection
is there so subscriptions are *automatically removed when theconnection
is destructed - there's subtle difference in the wording of the console message from "reached # active connections" to "reached # active subscribers".
The difference is key to understanding the added flexibility: the signal owner/invoker does not know anything about the subscribers. That's the decoupling/dependency inversion we're talking about
using-signals2
branch on github
#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
#include <boost/signals2.hpp>
namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;
static bool s_verbose = false;
struct connection : std::enable_shared_from_this<connection> {
connection(ba::io_context& ioc) : _s(ioc) {}
void start() { read_loop(); }
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
if (enqueue(std::move(msg), at_front))
write_loop();
});
}
private:
void do_echo() {
std::string line;
if (getline(std::istream(&_rx), line)) {
send(std::move(line) + '\n');
}
}
bool enqueue(std::string msg, bool at_front)
{ // returns true if need to start write loop
at_front &= !_tx.empty(); // no difference
if (at_front)
_tx.insert(std::next(begin(_tx)), std::move(msg));
else
_tx.push_back(std::move(msg));
return (_tx.size() == 1);
}
bool dequeue()
{ // returns true if more messages pending after dequeue
assert(!_tx.empty());
_tx.pop_front();
return !_tx.empty();
}
void write_loop() {
ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec && dequeue()) write_loop();
});
}
void read_loop() {
ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
do_echo();
if (!ec)
read_loop();
});
}
friend struct server;
ba::streambuf _rx;
std::list<std::string> _tx;
tcp::socket _s;
boost::signals2::scoped_connection _subscription;
};
struct server {
server(ba::io_context& ioc) : _ioc(ioc) {
_acc.bind({{}, 6767});
_acc.set_option(tcp::acceptor::reuse_address());
_acc.listen();
accept_loop();
}
void stop() {
_ioc.post([=] {
_acc.cancel();
_acc.close();
});
}
size_t broadcast(std::string const& msg) {
_broadcast_event(msg);
return _broadcast_event.num_slots();
}
private:
boost::signals2::signal<void(std::string const& msg)> _broadcast_event;
size_t reg_connection(connection& c) {
c._subscription = _broadcast_event.connect(
[&c](std::string msg){ c.send(msg, true); }
);
return _broadcast_event.num_slots();
}
void accept_loop() {
auto session = std::make_shared<connection>(_acc.get_io_context());
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
if (!ec) {
auto n = reg_connection(*session);
session->start();
accept_loop();
broadcast("player #" + std::to_string(n) + " has entered the game\n");
}
});
}
ba::io_context& _ioc;
tcp::acceptor _acc{_ioc, tcp::v4()};
};
int main(int argc, char** argv) {
s_verbose = argc>1 && argv[1] == "-v"s;
ba::io_context ioc;
server s(ioc);
std::thread th([&ioc] { ioc.run(); }); // todo exception handling
std::this_thread::sleep_for(1s);
auto n = s.broadcast("random global event broadcast\n");
std::cout << "Global event broadcast reached " << n << " active subscribers\n";
std::this_thread::sleep_for(2s);
s.stop(); // active connections will continue
th.join();
}
See the diff between Approach 1. and 2.: Compare View on github
A sample of the output when run against 3 concurrent clients with:
(for a in {1..3}; do netcat localhost 6767 < /etc/dictionaries-common/words > echoed.$a& sleep .1; done; time wait)
Segmentation fault on boost::asio::ip::tcp::acceptor
You need to post more code (Server::Server()) and post the files in separate code blocks for readability.
Try changing io to be a pointer, and explicitly initializing it in your constructor:
Irc::Server::Server() : io()
{
start();
startAccept();
}
Related Topics
Programmatically Check Whether My MAChine Has Internet Access or Not
C++11 Cross Compiler/Standard Library Random Distribution Reproducibility
How to Remove Duplicate Values from a List in C++
How to Convert a Time into Epoch Time
Throw and Ternary Operator in C++
Array of Size Defined by Not Constant Variable
Outputting More Things Than a Polymorphic Text Archive
What Is the Most Efficient Thread-Safe C++ Logger
How to Generate and Run Native Code Dynamically
Stl Random Distributions and Portability
Displacement Map Filter in Opencv
What Is the Meaning of This Star (*) Symbol in C++? - Pointer to Member
Strange Exception Throw - Assign: Operation Not Permitted
How to Print the Value of Nullptr on Screen
Why Cast to a Pointer Then Dereference
Breaking Out from Socket Select
C++ Opengl Rotations and Calculations
Is There a Case Where Including the Same Header Twice Is Actually Helpful