boost::asio + std::future - Access violation after closing socket
recvmsg
is receiving into a buffer (streambuf
) that was freed after throwing the exception in TCPClient::sendMessage
(line 105, end of scope).
You forgot to cancel the asynchronous operation (async_read_until
) started in line 97. Fix it:
else {
socket->cancel(); // ADDED
std::cout << "socket points to " << std::addressof(*socket) << std::endl;
throw std::runtime_error("timeout");
}
Or even, just
socket.reset(); // ADDED
Same goes for other timeout paths.
how to cancel a `boost::asio::read` operation while it's waiting
That's the nature of blocking IO.
Indeed socket.cancel()
(or even io_service::stop()
) will not work on synchronous operations.
The only way to interrupt this is to use socket-level timeouts (but Asio doesn't expose that) or to use asynchronous signals (e.g. pressing Ctrl-C in a terminal sends the child process a SIGINT).
I've previously created a poor-man's wrapper if you insist on running single operations with a timeout:
- boost::asio + std::future - Access violation after closing socket
How do I cleanly reconnect a boost::socket following a disconnect?
You need to create a new boost::asio::ip::tcp::socket
each time you reconnect. The easiest way to do this is probably to just allocate the socket on the heap using a boost::shared_ptr
(you could probably also get away with scoped_ptr
if your socket is entirely encapsulated within a class). E.g.:
bool MyClient::myconnect()
{
bool isConnected = false;
// Attempt connection
// socket is of type boost::shared_ptr<boost::asio::ip::tcp::socket>
socket.reset(new boost::asio::ip::tcp::socket(...));
socket->connect(server_endpoint, errorcode);
// ...
}
Then, when mydisconnect
is called, you could deallocate the socket:
void MyClient::mydisconnect(void)
{
// ...
// deallocate socket. will close any open descriptors
socket.reset();
}
The error you're seeing is probably a result of the OS cleaning up the file descriptor after you've called close
. When you call close
and then try to connect
on the same socket, you're probably trying to connect an invalid file descriptor. At this point you should see an error message starting with "Connection failed: ..." based on your logic, but you then call mydisconnect
which is probably then attempting to call shutdown
on an invalid file descriptor. Vicious cycle!
Waiting with timeout on boost::asio::async_connect fails (std::future::wait_for)
This appears to be a bug as answered here by Stephan Lavavej.
I wasn't able to find the original bug, but it's fixed in "the RTM version" (assuming VS2013).
This is affected by internal bug number DevDiv#255669 ":
wait_for()
/wait_until()
don't block". Fortunately, I've received a fix
for this from one of our Concurrency Runtime developers, Hong Hong. With my
current build of VC11, this works:With my current build of VC11, this works:
C:\Temp>type meow.cpp
#include <stdio.h>
#include <chrono>
#include <future>
#include <thread>
#include <windows.h>
using namespace std;
long long counter() {
LARGE_INTEGER li;
QueryPerformanceCounter(&li);
return li.QuadPart;
}
long long frequency() {
LARGE_INTEGER li;
QueryPerformanceFrequency(&li);
return li.QuadPart;
}
int main() {
printf("%02d.%02d.%05d.%02d\n", _MSC_VER / 100, _MSC_VER % 100, _MSC_FULL_VER % 100000, _MSC_BUILD);
future<int> f = async(launch::async, []() -> int {
this_thread::sleep_for(chrono::milliseconds(250));
for (int i = 0; i < 5; ++i) {
printf("Lambda: %d\n", i);
this_thread::sleep_for(chrono::seconds(2));
}
puts("Lambda: Returning.");
return 1729;
});
for (;;) {
const auto fs = f.wait_for(chrono::seconds(0));
if (fs == future_status::deferred) {
puts("Main thread: future_status::deferred (shouldn't happen, we used launch::async)");
} else if (fs == future_status::ready) {
puts("Main thread: future_status::ready");
break;
} else if (fs == future_status::timeout) {
puts("Main thread: future_status::timeout");
} else {
puts("Main thread: unknown future_status (UH OH)");
}
this_thread::sleep_for(chrono::milliseconds(500));
}
const long long start = counter();
const int n = f.get();
const long long finish = counter();
printf("Main thread: f.get() took %f microseconds to return %d.\n",
(finish - start) * 1000000.0 / frequency(), n);
}
C:\Temp>cl /EHsc /nologo /W4 /MTd meow.cpp
meow.cpp
C:\Temp>meow
17.00.50419.00
Main thread: future_status::timeout
Lambda: 0
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Lambda: 1
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Lambda: 2
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Lambda: 3
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Lambda: 4
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Lambda: Returning.
Main thread: future_status::ready
Main thread: f.get() took 2.303971 microseconds to return 1729.
I inserted timing code to prove that when wait_for() returns ready, f.get() returns instantly without blocking.
Basically, the workaround is to loop while it reports deferred
Check for data with timing?
You just need to attempt to read.
The usual approach is to define deadlines for all asynchronous operations that could take "long" (or even indefinitely long).
This is quite natural in asynchronous executions:
Just add a deadline timer:
boost::asio::deadline_timer tim(svc);
tim.expires_from_now(boost::posix_time::seconds(2));
tim.async_wait([](error_code ec) {
if (!ec) // timer was not canceled, so it expired
{
socket_.cancel(); // cancel pending async operation
}
});
If you want to use it with synchronous calls, you can with judicious use of poll()
instead of run()
. See this answer: boost::asio + std::future - Access violation after closing socket which implements a helper await_operation
that runs a single operations synchronously but under a timeout.
Is there a problem with the socket I wrote for boost::socket, for the simplifying work with network?
That's a lot of code.
Always compile with warnings enabled. This would have told you that members are not constructed in the order you list their initializers. Importantly, the second one is UB:
explicit SocketImpl(socket_type sock)
: resolver_(sock.get_executor()), timeout_(sock.get_executor()), socket_(std::move(sock)) {}Because
socket_
is declared beforetimeout_
, it will also be initialized before, meaning thatsock.get_executor()
is actually use-after-move. Oops. Fix it:explicit SocketImpl(socket_type sock)
: resolver_(sock.get_executor()), socket_(std::move(sock)), timeout_(socket_.get_executor()) {}Now, even though the other constructor doesn't have such a problem, it's good practice to match declaration order there as well:
explicit SocketImpl(Executor executor)
: resolver_(executor)
, socket_(executor)
, timeout_(executor) {}
explicit SocketImpl(socket_type sock)
: resolver_(sock.get_executor())
, socket_(std::move(sock))
, timeout_(socket_.get_executor()) {}(Kudos for making constructors
explicit
)I'd implement any
Impl
class inline (the naming suggests that the entire class is "implementation detail" anyways).Destructors like this are busy-work:
template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
SocketImpl<socket_type, resolver_type, endpoint_iter_type>::~SocketImpl() {
if (socket_.is_open()) {
socket_.close();
}
}The default destructor of
socket_
will already do that. All you do is get
in the way of the compiler to generate optimal, exception safe code. E.g.
in this caseclose()
might raise an exception. Did you want that?Consider taking arguments that hold resource by const-reference, or by
value if you intend tostd::move()
from them.virtual void do_resolve(std::string host, std::string port,
ConnectCallback const&,
ErrorCallback const&) = 0;These instantiations:
template <Type>
struct Socket
: public SocketImpl<boost::asio::ip::tcp::socket,
boost::asio::ip::tcp::resolver,
boost::asio::ip::tcp::resolver::iterator> {and
template <>
struct Socket<UDP>
: public SocketImpl<boost::asio::ip::udp::socket,
boost::asio::ip::udp::resolver,
boost::asio::ip::udp::resolver::iterator> {Seem laborious. Why not use the generic templates and protocols from Asio directly? You could even throw in a free performance optimization by allowing callers to override the type-erased executor type:
template <typename Protocol,
typename Executor = boost::asio::any_io_executor>
struct SocketImpl
: public std::enable_shared_from_this<SocketImpl<Protocol, Executor>> {
public:
using base_type = SocketImpl<Protocol, Executor>;
using socket_type = std::conditional_t<
std::is_same_v<Protocol, boost::asio::ip::udp>,
boost::asio::basic_datagram_socket<Protocol, Executor>,
boost::asio::basic_socket<Protocol, Executor>>;
using resolver_type =
boost::asio::ip::basic_resolver<Protocol, Executor>;
using endpoint_iter_type = typename resolver_type::iterator;Now your instantiations can just be:
template <Type> struct Socket : public SocketImpl<boost::asio::ip::tcp> {
// ...
template <> struct Socket<UDP> : public SocketImpl<boost::asio::ip::udp> {with the exact behaviour you had, or better:
using StrandEx = boost::asio::strand<boost::asio::io_context::executor_type>;
template <Type> struct Socket : public SocketImpl<boost::asio::ip::tcp, StrandEx> {
// ...
template <> struct Socket<UDP> : public SocketImpl<boost::asio::ip::udp, StrandEx> {with the executor optimized for the strand as you were restricting it to anyways!
Instead of repeating the type arguments:
explicit Socket(boost::asio::ip::tcp::socket sock) : SocketImpl(std::move(sock)) {
refer to exposed typedefs, so you have a single source of truth:
explicit Socket(base_type::socket_type sock) : SocketImpl(std::move(sock)) {
Pass executors by value. They're cheap to copy and you could even move from
them since you're "sinking" them into you membersIn fact, just inherit the constructors whole-sale instead of repeating. So even:
template <Type>
struct Socket : public SocketImpl<boost::asio::ip::tcp, StrandEx> {
explicit Socket(StrandEx executor) : SocketImpl(executor) {}
explicit Socket(base_type::socket_type sock)
: SocketImpl(std::move(sock)) {}Could just be:
template <Type>
struct Socket : public SocketImpl<boost::asio::ip::tcp, StrandEx> {
using base_type::base_type;and land you with the exact same set of constructors.
That constructor sets
is_connected
but it's lying about it. Because it
sets it totrue
when the socket is merely open. You don't want this, nor
do you need it.In your code, nobody is using that. What you might want in a deriving
client, is a state machine. It's up to them. No need to add a racy, lying
interface to your base class. Leave the responsibility where it belongs.Same with this:
#ifdef OS_WIN
SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
#endifThat's a violation of separation of concerns. You might want this
behaviour, but your callers/users might want something else. Worse, this
behaviour may break their code that had a different preference in place.Get()
does nothing but obscure that it returnsshared_from_this
. If it
is there to avoid explicitly qualifying withthis->
(because the base
class is a dependent type), just, again, use ausing
declaration:using std::enable_shared_from_this<SocketImpl>::shared_from_this;
There's a big problem with
PostCallback
beingstd::function
. It hides
associated executor types! See
boost::asio::bind_executor does not execute in strand
for a detailed description of this issueIn your case, there is absolutely no reason at all to type erase the
Post
argument, so don't:void Post(PostCallback callback) {
post(socket_.get_executor(), std::move(callback));
}Should be
template <typename PostCallback>
void Post(PostCallback&& callback) {
post(socket_.get_executor(), std::forward<PostCallback>(callback));
}I'd do the same for the other callback types.
using ConnectCallback = std::function<void()>;
using PromoteCallback = std::function<void()>;
using WriteCallback = std::function<void(size_t)>;
using ReadCallback = std::function<void(const uint8_t*, size_t)>;
using ErrorCallback = std::function<void(const std::string&)>;But I'll leave it as an exorcism for the reader for now.
Socket<UDP>::Promote
is a weird one. Firstly, I question the logic.void Socket<UDP>::Promote(const PromoteCallback &callback) {
auto self = shared_from_this();
Post([this, self, callback] {
endpoint_iter_++;
socket_.cancel();
callback();
});
}I do not feel comfortable incrementing an
endpoint_iter_
without
checking whether it's not already past-the-end.Besides, nothing prevents this from running before
async_resolve
completes. I think it's cleaner to cancel the pending operations before
incrementing that iterator.Finally,
callback()
isvoid(void)
so - you're merely trying to
synchronize on the completion of the task. I'd suggest a future for this:std::future<void> Promote() {
return Post(std::packaged_task<void()>([this, self = shared_from_this()] {
socket_.cancel(); // TODO wait for that to complete before incrementing
endpoint_iter_++;
}));
}A class template with a template argument that is never used is a clear
sign of the fact that it doesn't need to be a template.Socket<TCP>
andSocket<UDP>
are not related.Separate the conjoined twins makes their life easier:
struct TCPSocket : SocketImpl<asio::ip::tcp, StrandEx> { /*...*/ };
struct UDPSocket : SocketImpl<asio::ip::udp, StrandEx> { /*...*/ };If for some arcane reason you really want to have the template definition:
template <Type> struct Socket;
template <> struct Socket<TCP> : TCPSocket { using TCPSocket::TCPSocket; };
template <> struct Socket<UDP> : UDPSocket { using UDPSocket::UDPSocket; };I hope that the triviality of it drives home the point that the types don't need to be related.
deadline
is missing code, you're calling it with aHandler
callback,
but it doesn't take any argument. I'll just make up the missing bits:template <typename Handler>
void Await(boost::posix_time::time_duration ms, Handler f) {
Post([this, self = shared_from_this(), ms, f] {
timeout_.expires_from_now(ms);
timeout_.template async_wait(
[self, f = std::move(f)](error_code ec) {
if (!ec) {
asio::dispatch(std::move(f));
}
});
});
}stop_await()
makes it even more enigmatic: the fact thattimeout_
is
being canceled without regard for who postedAwait
, when and for how long
does suggest that you wanted it to perform as a deadline, so indeed the user callback wasn't applicable really.However, I cannot explain why the
timeout_
was being restarted
automatically (although it wouldn't be if it were canceled, because of theif (!ec)
check in the lambda. I admit I really cannot figure this out, so
you'll have to decide what you wanted it to do yourself.The
Read
/ReadUntil
interfaces are quite limited. I cannot see how I'd
read a simple HTTP response with it for example. For my example, I'll just
read the response headers, instead.You should always prefer using
std::span
orstd::string_view
over
pairs ofcharT const*, size_t
. It's just much less error prone, and much
more expressive.using Data = std::basic_string_view<uint8_t>; // or span and similar
// ... eg:
using ReadCallback = std::function<void(Data)>;Wait, what is this?
asio::ip::udp::endpoint endpoint = *endpoint_iter_;
socket_.async_receive_from(
asio::buffer(buff_.prepare(size)), endpoint,Did you mean to overwrite endpoints from the resolver results? That makes no
sense.Note,
async_receive_from
uses the endpoint reference argument to indicate
the source of an incoming message. You're passing a reference to a local
variable here, which causes Undefine
Behaviour because the
async operation will be completing after the local variable disappeared.Instead, use a member variable.
asio::ip::udp::endpoint sender_;
void Read(size_t size, const ReadCallback &read_callback, const ErrorCallback &error_callback) override {
Post([=, this, self = shared_from_this()] {
socket_.async_receive_from(
asio::buffer(buff_.prepare(size)), sender_,A
streambuf
seems overkill for most operations, but certainly for the
datagram protocol. Also, you declare it in the base-class, which never uses
it anywhere. Consider moving it to the TCP/UDP derived classes.The whole do_connect/do_resolve thing is something that I think needs to be
inSocketImpl
. It's largely identical for TCP/UDP and if that's not in
the base class, and Read[Until]/Send are already per-protocol, I don't
really see why you'd have a base-class at all.I'd switch on a
IS_DATAGRAM
property likestatic constexpr bool is_datagram = std::is_same_v<Protocol, asio::ip::udp>;
And have one implementation:
void do_resolve(std::string const& host, std::string const& port,
ConnectCallback connect_callback,
ErrorCallback error_callback) {
resolver_.async_resolve(
host, port,
[=, this, self = shared_from_this()](
error_code ec, endpoint_iter_type endpoints) {
stop_await();
if (!ec) {
endpoint_iter_ = std::move(endpoints);
if constexpr (is_datagram) {
socket_.open(endpoint_iter_->endpoint().protocol());
connect_callback();
} else {
do_connect(endpoint_iter_, connect_callback,
error_callback);
}
} else {
error_callback("Unable to resolve host: " +
ec.message());
}
});
}If you're wondering how
do_connect
could compileFor the virtual methods to make sense, there should be a shared interface, which you currently don't have. So either create a base class interface like:
struct ISocket {
virtual ~ISocket() = default;
virtual void Send(Data msg, const WriteCallback &write_callback, const ErrorCallback &error_callback) = 0;
virtual void Read(size_t size, const ReadCallback &read_callback, const ErrorCallback &error_callback) = 0;
};
template <typename Protocol, typename Executor = asio::any_io_executor>
struct SocketImpl
: public std::enable_shared_from_this<SocketImpl<Protocol, Executor>>
, ISocket { // ....Note that this makes it more important to have the virtual destructor. (Although using
make_shared<ConcreteType>
can save you because the shared pointers contain the right deleter)Looks to me like the
Await
should also have been virtual. But you made it atemplate<>
member function (actually the right thing to do IMO).Alternatively, instead of going for virtuals, embrace that you didn't need
the shared interface based on dynamic polymorphism.If you ever need to make SocketImp behaviour dependent on the derived
class, you can and make it CRTP (Curiously Recurring Template Pattern)
instead.This is what I've done below.
Adapted Listing And Demo
Here's a listing with demo. The namespace network
went from 313 to 229 lines.
#include <boost/asio.hpp>
#include <memory>
#include <string>
using Data = std::basic_string_view<uint8_t>; // or span and similar
namespace network {
namespace asio = boost::asio;
using boost::system::error_code;
template <typename Protocol, typename Executor = asio::any_io_executor>
struct SocketImpl
: public std::enable_shared_from_this<SocketImpl<Protocol, Executor>> {
public:
using base_type = SocketImpl<Protocol, Executor>;
static constexpr bool is_datagram = std::is_same_v<Protocol, asio::ip::udp>;
using socket_type = std::conditional_t<is_datagram,
asio::basic_datagram_socket<Protocol, Executor>,
asio::basic_stream_socket<Protocol, Executor>>;
using resolver_type = asio::ip::basic_resolver<Protocol, Executor>;
using endpoint_iter_type = typename resolver_type::iterator;
using std::enable_shared_from_this<SocketImpl>::shared_from_this;
using ConnectCallback = std::function<void()>;
using PromoteCallback = std::function<void()>;
using WriteCallback = std::function<void(size_t)>;
using ReadCallback = std::function<void(Data)>;
using ErrorCallback = std::function<void(const std::string&)>;
explicit SocketImpl(Executor executor)
: resolver_(executor)
, socket_(executor)
, timeout_(executor) {}
explicit SocketImpl(socket_type sock)
: resolver_(sock.get_executor())
, socket_(std::move(sock))
, timeout_(socket_.get_executor()) {}
template <typename Token> decltype(auto) Post(Token&& callback) {
return asio::post(socket_.get_executor(), std::forward<Token>(callback));
}
void Connect(std::string Host, std::string Port,
const ConnectCallback& connect_callback,
const ErrorCallback& error_callback) {
Post([=, self = shared_from_this()] {
self->do_resolve(Host, Port, connect_callback, error_callback);
});
}
template <typename Handler>
void Await(boost::posix_time::time_duration ms, Handler f) {
Post([this, self = shared_from_this(), ms, f] {
timeout_.expires_from_now(ms);
timeout_.template async_wait(
[self, f = std::move(f)](error_code ec) {
if (!ec) {
asio::dispatch(std::move(f));
}
});
});
}
void Disconnect() {
Post([this, self = shared_from_this()] {
timeout_.cancel();
resolver_.cancel();
if (socket_.is_open()) {
socket_.cancel();
}
});
}
protected:
void stop_await() { timeout_.cancel(); }
void do_resolve(std::string const& host, std::string const& port,
ConnectCallback connect_callback,
ErrorCallback error_callback) {
resolver_.async_resolve(
host, port,
[=, this, self = shared_from_this()](
error_code ec, endpoint_iter_type endpoints) {
stop_await();
if (!ec) {
endpoint_iter_ = std::move(endpoints);
if constexpr (is_datagram) {
socket_.open(endpoint_iter_->endpoint().protocol());
connect_callback();
} else {
do_connect(endpoint_iter_, connect_callback,
error_callback);
}
} else {
error_callback("Unable to resolve host: " + ec.message());
}
});
}
void do_connect(endpoint_iter_type endpoints,
ConnectCallback connect_callback,
ErrorCallback error_callback) {
async_connect( //
socket_, std::move(endpoints),
[=, this, self = shared_from_this()](error_code ec, endpoint_iter_type) {
stop_await();
if (!ec) {
connect_callback();
} else {
error_callback("Unable to connect host: " + ec.message());
}
});
}
resolver_type resolver_;
endpoint_iter_type endpoint_iter_;
socket_type socket_;
asio::deadline_timer timeout_;
};
using StrandEx = asio::strand<asio::io_context::executor_type>;
struct TCPSocket : SocketImpl<asio::ip::tcp, StrandEx> {
using base_type::base_type;
void Send(Data msg, WriteCallback write_callback, ErrorCallback error_callback) {
Post([=, this, self = shared_from_this()] {
async_write(socket_, asio::buffer(msg),
[self, write_callback,
error_callback](error_code ec, size_t xfr) {
if (!ec) {
write_callback(xfr);
} else {
error_callback(ec.message());
}
});
});
}
void Read(size_t size, ReadCallback read_callback, ErrorCallback error_callback) {
Post([=, this, self = shared_from_this()] {
async_read(
socket_, asio::buffer(buff_.prepare(size)),
[this, self, read_callback, error_callback](error_code ec,
size_t length) {
stop_await();
if (!ec) {
auto data =
asio::buffer_cast<const uint8_t*>(buff_.data());
read_callback({data, length});
} else {
error_callback(ec.message());
}
buff_.consume(length);
});
});
}
void ReadUntil(std::string until_str, const ReadCallback &read_callback, const ErrorCallback &error_callback) {
Post([=, this, self = shared_from_this()] {
async_read_until(
socket_, buff_, until_str,
[=, this](error_code ec, size_t xfr) {
stop_await();
if (!ec) {
auto data =
asio::buffer_cast<const uint8_t*>(buff_.data());
read_callback({data, xfr});
} else {
error_callback(ec.message());
}
buff_.consume(xfr);
});
});
}
protected:
asio::streambuf buff_;
};
Related Topics
How to Pass Std::Unique_Ptr into a Function
Improve Matching of Feature Points with Opencv
References Needed for Implementing an Interpreter in C/C++
What Does Iota of Std::Iota Stand For
Adding Multiple Executables in Cmake
Should Every Class Have a Virtual Destructor
What Is the Recommended Way to Align Memory in C++11
Differencebetween Const_Iterator and Iterator
When to Use Const Char * and When to Use Const Char []
Does C++11 Unique_Ptr and Shared_Ptr Able to Convert to Each Other's Type
String::Size_Type Instead of Int
Usage of Std::Forward VS Std::Move
Understanding the Difference Between F() and F(Void) in C and C++ Once and for All
Significance of a .Inl File in C++
Namespaces for Enum Types - Best Practices
How to Generate a Newline in a Cpp MACro