Boost::Asio + Std::Future - Access Violation After Closing Socket

boost::asio + std::future - Access violation after closing socket

recvmsg is receiving into a buffer (streambuf) that was freed after throwing the exception in TCPClient::sendMessage (line 105, end of scope).

You forgot to cancel the asynchronous operation (async_read_until) started in line 97. Fix it:

else {
socket->cancel(); // ADDED
std::cout << "socket points to " << std::addressof(*socket) << std::endl;
throw std::runtime_error("timeout");
}

Or even, just

    socket.reset(); // ADDED

Same goes for other timeout paths.

how to cancel a `boost::asio::read` operation while it's waiting

That's the nature of blocking IO.

Indeed socket.cancel() (or even io_service::stop()) will not work on synchronous operations.

The only way to interrupt this is to use socket-level timeouts (but Asio doesn't expose that) or to use asynchronous signals (e.g. pressing Ctrl-C in a terminal sends the child process a SIGINT).

I've previously created a poor-man's wrapper if you insist on running single operations with a timeout:

  • boost::asio + std::future - Access violation after closing socket

How do I cleanly reconnect a boost::socket following a disconnect?

You need to create a new boost::asio::ip::tcp::socket each time you reconnect. The easiest way to do this is probably to just allocate the socket on the heap using a boost::shared_ptr (you could probably also get away with scoped_ptr if your socket is entirely encapsulated within a class). E.g.:

bool MyClient::myconnect()
{
bool isConnected = false;

// Attempt connection
// socket is of type boost::shared_ptr<boost::asio::ip::tcp::socket>
socket.reset(new boost::asio::ip::tcp::socket(...));
socket->connect(server_endpoint, errorcode);
// ...
}

Then, when mydisconnect is called, you could deallocate the socket:

void MyClient::mydisconnect(void)
{
// ...
// deallocate socket. will close any open descriptors
socket.reset();
}

The error you're seeing is probably a result of the OS cleaning up the file descriptor after you've called close. When you call close and then try to connect on the same socket, you're probably trying to connect an invalid file descriptor. At this point you should see an error message starting with "Connection failed: ..." based on your logic, but you then call mydisconnect which is probably then attempting to call shutdown on an invalid file descriptor. Vicious cycle!

Waiting with timeout on boost::asio::async_connect fails (std::future::wait_for)

This appears to be a bug as answered here by Stephan Lavavej.

I wasn't able to find the original bug, but it's fixed in "the RTM version" (assuming VS2013).

This is affected by internal bug number DevDiv#255669 ":
wait_for()/wait_until() don't block". Fortunately, I've received a fix
for this from one of our Concurrency Runtime developers, Hong Hong. With my
current build of VC11, this works:

With my current build of VC11, this works:

C:\Temp>type meow.cpp
#include <stdio.h>
#include <chrono>
#include <future>
#include <thread>
#include <windows.h>
using namespace std;

long long counter() {
LARGE_INTEGER li;
QueryPerformanceCounter(&li);
return li.QuadPart;
}

long long frequency() {
LARGE_INTEGER li;
QueryPerformanceFrequency(&li);
return li.QuadPart;
}

int main() {
printf("%02d.%02d.%05d.%02d\n", _MSC_VER / 100, _MSC_VER % 100, _MSC_FULL_VER % 100000, _MSC_BUILD);

future<int> f = async(launch::async, []() -> int {
this_thread::sleep_for(chrono::milliseconds(250));

for (int i = 0; i < 5; ++i) {
printf("Lambda: %d\n", i);
this_thread::sleep_for(chrono::seconds(2));
}

puts("Lambda: Returning.");
return 1729;
});

for (;;) {
const auto fs = f.wait_for(chrono::seconds(0));

if (fs == future_status::deferred) {
puts("Main thread: future_status::deferred (shouldn't happen, we used launch::async)");
} else if (fs == future_status::ready) {
puts("Main thread: future_status::ready");
break;
} else if (fs == future_status::timeout) {
puts("Main thread: future_status::timeout");
} else {
puts("Main thread: unknown future_status (UH OH)");
}

this_thread::sleep_for(chrono::milliseconds(500));
}

const long long start = counter();

const int n = f.get();

const long long finish = counter();

printf("Main thread: f.get() took %f microseconds to return %d.\n",
(finish - start) * 1000000.0 / frequency(), n);
}

C:\Temp>cl /EHsc /nologo /W4 /MTd meow.cpp
meow.cpp

C:\Temp>meow
17.00.50419.00
Main thread: future_status::timeout
Lambda: 0
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Lambda: 1
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Lambda: 2
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Lambda: 3
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Lambda: 4
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Main thread: future_status::timeout
Lambda: Returning.
Main thread: future_status::ready
Main thread: f.get() took 2.303971 microseconds to return 1729.

I inserted timing code to prove that when wait_for() returns ready, f.get() returns instantly without blocking.

Basically, the workaround is to loop while it reports deferred

Check for data with timing?

You just need to attempt to read.

The usual approach is to define deadlines for all asynchronous operations that could take "long" (or even indefinitely long).

This is quite natural in asynchronous executions:

Just add a deadline timer:

 boost::asio::deadline_timer tim(svc);
tim.expires_from_now(boost::posix_time::seconds(2));
tim.async_wait([](error_code ec) {
if (!ec) // timer was not canceled, so it expired
{
socket_.cancel(); // cancel pending async operation
}
});

If you want to use it with synchronous calls, you can with judicious use of poll() instead of run(). See this answer: boost::asio + std::future - Access violation after closing socket which implements a helper await_operation that runs a single operations synchronously but under a timeout.

Is there a problem with the socket I wrote for boost::socket, for the simplifying work with network?

That's a lot of code.

  1. Always compile with warnings enabled. This would have told you that members are not constructed in the order you list their initializers. Importantly, the second one is UB:

    explicit SocketImpl(socket_type sock)
    : resolver_(sock.get_executor()), timeout_(sock.get_executor()), socket_(std::move(sock)) {}

    Because socket_ is declared before timeout_, it will also be initialized before, meaning that sock.get_executor() is actually use-after-move. Oops. Fix it:

    explicit SocketImpl(socket_type sock)
    : resolver_(sock.get_executor()), socket_(std::move(sock)), timeout_(socket_.get_executor()) {}

    Now, even though the other constructor doesn't have such a problem, it's good practice to match declaration order there as well:

    explicit SocketImpl(Executor executor)
    : resolver_(executor)
    , socket_(executor)
    , timeout_(executor) {}

    explicit SocketImpl(socket_type sock)
    : resolver_(sock.get_executor())
    , socket_(std::move(sock))
    , timeout_(socket_.get_executor()) {}

    (Kudos for making constructors explicit)

  2. I'd implement any Impl class inline (the naming suggests that the entire class is "implementation detail" anyways).

  3. Destructors like this are busy-work:

    template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
    SocketImpl<socket_type, resolver_type, endpoint_iter_type>::~SocketImpl() {
    if (socket_.is_open()) {
    socket_.close();
    }
    }

    The default destructor of socket_ will already do that. All you do is get
    in the way of the compiler to generate optimal, exception safe code. E.g.
    in this case close() might raise an exception. Did you want that?

  4. Consider taking arguments that hold resource by const-reference, or by
    value if you intend to std::move() from them.

    virtual void do_resolve(std::string host, std::string port,
    ConnectCallback const&,
    ErrorCallback const&) = 0;
  5. These instantiations:

    template <Type>
    struct Socket
    : public SocketImpl<boost::asio::ip::tcp::socket,
    boost::asio::ip::tcp::resolver,
    boost::asio::ip::tcp::resolver::iterator> {

    and

    template <>
    struct Socket<UDP>
    : public SocketImpl<boost::asio::ip::udp::socket,
    boost::asio::ip::udp::resolver,
    boost::asio::ip::udp::resolver::iterator> {

    Seem laborious. Why not use the generic templates and protocols from Asio directly? You could even throw in a free performance optimization by allowing callers to override the type-erased executor type:

    template <typename Protocol,
    typename Executor = boost::asio::any_io_executor>
    struct SocketImpl
    : public std::enable_shared_from_this<SocketImpl<Protocol, Executor>> {
    public:
    using base_type = SocketImpl<Protocol, Executor>;
    using socket_type = std::conditional_t<
    std::is_same_v<Protocol, boost::asio::ip::udp>,
    boost::asio::basic_datagram_socket<Protocol, Executor>,
    boost::asio::basic_socket<Protocol, Executor>>;
    using resolver_type =
    boost::asio::ip::basic_resolver<Protocol, Executor>;
    using endpoint_iter_type = typename resolver_type::iterator;

    Now your instantiations can just be:

    template <Type> struct Socket : public SocketImpl<boost::asio::ip::tcp> {
    // ...
    template <> struct Socket<UDP> : public SocketImpl<boost::asio::ip::udp> {

    with the exact behaviour you had, or better:

    using StrandEx = boost::asio::strand<boost::asio::io_context::executor_type>;

    template <Type> struct Socket : public SocketImpl<boost::asio::ip::tcp, StrandEx> {
    // ...
    template <> struct Socket<UDP> : public SocketImpl<boost::asio::ip::udp, StrandEx> {

    with the executor optimized for the strand as you were restricting it to anyways!

  6. Instead of repeating the type arguments:

    explicit Socket(boost::asio::ip::tcp::socket sock) : SocketImpl(std::move(sock)) {

    refer to exposed typedefs, so you have a single source of truth:

    explicit Socket(base_type::socket_type sock) : SocketImpl(std::move(sock)) {
  7. Pass executors by value. They're cheap to copy and you could even move from
    them since you're "sinking" them into you members

  8. In fact, just inherit the constructors whole-sale instead of repeating. So even:

    template <Type>
    struct Socket : public SocketImpl<boost::asio::ip::tcp, StrandEx> {
    explicit Socket(StrandEx executor) : SocketImpl(executor) {}
    explicit Socket(base_type::socket_type sock)
    : SocketImpl(std::move(sock)) {}

    Could just be:

    template <Type>
    struct Socket : public SocketImpl<boost::asio::ip::tcp, StrandEx> {
    using base_type::base_type;

    and land you with the exact same set of constructors.

  9. That constructor sets is_connected but it's lying about it. Because it
    sets it to true when the socket is merely open. You don't want this, nor
    do you need it.

    In your code, nobody is using that. What you might want in a deriving
    client, is a state machine. It's up to them. No need to add a racy, lying
    interface to your base class. Leave the responsibility where it belongs.

  10. Same with this:

     #ifdef OS_WIN
    SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
    #endif

    That's a violation of separation of concerns. You might want this
    behaviour, but your callers/users might want something else. Worse, this
    behaviour may break their code that had a different preference in place.

  11. Get() does nothing but obscure that it returns shared_from_this. If it
    is there to avoid explicitly qualifying with this-> (because the base
    class is a dependent type), just, again, use a using declaration:

    using std::enable_shared_from_this<SocketImpl>::shared_from_this;
  12. There's a big problem with PostCallback being std::function. It hides
    associated executor types! See
    boost::asio::bind_executor does not execute in strand
    for a detailed description of this issue

    In your case, there is absolutely no reason at all to type erase the Post argument, so don't:

    void Post(PostCallback callback) {
    post(socket_.get_executor(), std::move(callback));
    }

    Should be

    template <typename PostCallback>
    void Post(PostCallback&& callback) {
    post(socket_.get_executor(), std::forward<PostCallback>(callback));
    }

    I'd do the same for the other callback types.

    using ConnectCallback = std::function<void()>;
    using PromoteCallback = std::function<void()>;
    using WriteCallback = std::function<void(size_t)>;
    using ReadCallback = std::function<void(const uint8_t*, size_t)>;
    using ErrorCallback = std::function<void(const std::string&)>;

    But I'll leave it as an exorcism for the reader for now.

  13. Socket<UDP>::Promote is a weird one. Firstly, I question the logic.

        void Socket<UDP>::Promote(const PromoteCallback &callback) {
    auto self = shared_from_this();
    Post([this, self, callback] {
    endpoint_iter_++;
    socket_.cancel();
    callback();
    });
    }

    I do not feel comfortable incrementing an endpoint_iter_ without
    checking whether it's not already past-the-end.

    Besides, nothing prevents this from running before async_resolve
    completes. I think it's cleaner to cancel the pending operations before
    incrementing that iterator.

    Finally, callback() is void(void) so - you're merely trying to
    synchronize on the completion of the task. I'd suggest a future for this:

       std::future<void> Promote() {
    return Post(std::packaged_task<void()>([this, self = shared_from_this()] {
    socket_.cancel(); // TODO wait for that to complete before incrementing
    endpoint_iter_++;
    }));
    }
  14. A class template with a template argument that is never used is a clear
    sign of the fact that it doesn't need to be a template. Socket<TCP> and
    Socket<UDP> are not related.

    Separate the conjoined twins makes their life easier:

    struct TCPSocket : SocketImpl<asio::ip::tcp, StrandEx> { /*...*/ };
    struct UDPSocket : SocketImpl<asio::ip::udp, StrandEx> { /*...*/ };

    If for some arcane reason you really want to have the template definition:

    template <Type> struct Socket;
    template <> struct Socket<TCP> : TCPSocket { using TCPSocket::TCPSocket; };
    template <> struct Socket<UDP> : UDPSocket { using UDPSocket::UDPSocket; };

    I hope that the triviality of it drives home the point that the types don't need to be related.

  15. deadline is missing code, you're calling it with a Handler callback,
    but it doesn't take any argument. I'll just make up the missing bits:

    template <typename Handler>
    void Await(boost::posix_time::time_duration ms, Handler f) {
    Post([this, self = shared_from_this(), ms, f] {
    timeout_.expires_from_now(ms);
    timeout_.template async_wait(
    [self, f = std::move(f)](error_code ec) {
    if (!ec) {
    asio::dispatch(std::move(f));
    }
    });
    });
    }
  16. stop_await() makes it even more enigmatic: the fact that timeout_ is
    being canceled without regard for who posted Await, when and for how long
    does suggest that you wanted it to perform as a deadline, so indeed the user callback wasn't applicable really.

    However, I cannot explain why the timeout_ was being restarted
    automatically (although it wouldn't be if it were canceled, because of the
    if (!ec) check in the lambda. I admit I really cannot figure this out, so
    you'll have to decide what you wanted it to do yourself.

  17. The Read/ReadUntil interfaces are quite limited. I cannot see how I'd
    read a simple HTTP response with it for example. For my example, I'll just
    read the response headers, instead.

  18. You should always prefer using std::span or std::string_view over
    pairs of charT const*, size_t. It's just much less error prone, and much
    more expressive.

    using Data = std::basic_string_view<uint8_t>; // or span and similar
    // ... eg:
    using ReadCallback = std::function<void(Data)>;
  19. Wait, what is this?

    asio::ip::udp::endpoint endpoint = *endpoint_iter_;
    socket_.async_receive_from(
    asio::buffer(buff_.prepare(size)), endpoint,

    Did you mean to overwrite endpoints from the resolver results? That makes no
    sense.

    Note, async_receive_from uses the endpoint reference argument to indicate
    the source of an incoming message. You're passing a reference to a local
    variable here, which causes Undefine
    Behaviour because the
    async operation will be completing after the local variable disappeared.

    Instead, use a member variable.

    asio::ip::udp::endpoint sender_;

    void Read(size_t size, const ReadCallback &read_callback, const ErrorCallback &error_callback) override {
    Post([=, this, self = shared_from_this()] {
    socket_.async_receive_from(
    asio::buffer(buff_.prepare(size)), sender_,
  20. A streambuf seems overkill for most operations, but certainly for the
    datagram protocol. Also, you declare it in the base-class, which never uses
    it anywhere. Consider moving it to the TCP/UDP derived classes.

  21. The whole do_connect/do_resolve thing is something that I think needs to be
    in SocketImpl. It's largely identical for TCP/UDP and if that's not in
    the base class, and Read[Until]/Send are already per-protocol, I don't
    really see why you'd have a base-class at all.

    I'd switch on a IS_DATAGRAM property like

    static constexpr bool is_datagram = std::is_same_v<Protocol, asio::ip::udp>;

    And have one implementation:

    void do_resolve(std::string const& host, std::string const& port,
    ConnectCallback connect_callback,
    ErrorCallback error_callback) {
    resolver_.async_resolve(
    host, port,
    [=, this, self = shared_from_this()](
    error_code ec, endpoint_iter_type endpoints) {
    stop_await();
    if (!ec) {
    endpoint_iter_ = std::move(endpoints);
    if constexpr (is_datagram) {
    socket_.open(endpoint_iter_->endpoint().protocol());
    connect_callback();
    } else {
    do_connect(endpoint_iter_, connect_callback,
    error_callback);
    }
    } else {
    error_callback("Unable to resolve host: " +
    ec.message());
    }
    });
    }

    If you're wondering how do_connect could compile

  22. For the virtual methods to make sense, there should be a shared interface, which you currently don't have. So either create a base class interface like:

    struct ISocket {
    virtual ~ISocket() = default;
    virtual void Send(Data msg, const WriteCallback &write_callback, const ErrorCallback &error_callback) = 0;
    virtual void Read(size_t size, const ReadCallback &read_callback, const ErrorCallback &error_callback) = 0;
    };

    template <typename Protocol, typename Executor = asio::any_io_executor>
    struct SocketImpl
    : public std::enable_shared_from_this<SocketImpl<Protocol, Executor>>
    , ISocket { // ....

    Note that this makes it more important to have the virtual destructor. (Although using make_shared<ConcreteType> can save you because the shared pointers contain the right deleter)

  23. Looks to me like the Await should also have been virtual. But you made it a template<> member function (actually the right thing to do IMO).

  24. Alternatively, instead of going for virtuals, embrace that you didn't need
    the shared interface based on dynamic polymorphism.

    If you ever need to make SocketImp behaviour dependent on the derived
    class, you can and make it CRTP (Curiously Recurring Template Pattern)
    instead.

    This is what I've done below.

Adapted Listing And Demo

Here's a listing with demo. The namespace network went from 313 to 229 lines.

#include <boost/asio.hpp>
#include <memory>
#include <string>

using Data = std::basic_string_view<uint8_t>; // or span and similar

namespace network {
namespace asio = boost::asio;
using boost::system::error_code;

template <typename Protocol, typename Executor = asio::any_io_executor>
struct SocketImpl
: public std::enable_shared_from_this<SocketImpl<Protocol, Executor>> {
public:
using base_type = SocketImpl<Protocol, Executor>;

static constexpr bool is_datagram = std::is_same_v<Protocol, asio::ip::udp>;
using socket_type = std::conditional_t<is_datagram,
asio::basic_datagram_socket<Protocol, Executor>,
asio::basic_stream_socket<Protocol, Executor>>;
using resolver_type = asio::ip::basic_resolver<Protocol, Executor>;
using endpoint_iter_type = typename resolver_type::iterator;

using std::enable_shared_from_this<SocketImpl>::shared_from_this;

using ConnectCallback = std::function<void()>;
using PromoteCallback = std::function<void()>;
using WriteCallback = std::function<void(size_t)>;
using ReadCallback = std::function<void(Data)>;
using ErrorCallback = std::function<void(const std::string&)>;

explicit SocketImpl(Executor executor)
: resolver_(executor)
, socket_(executor)
, timeout_(executor) {}

explicit SocketImpl(socket_type sock)
: resolver_(sock.get_executor())
, socket_(std::move(sock))
, timeout_(socket_.get_executor()) {}

template <typename Token> decltype(auto) Post(Token&& callback) {
return asio::post(socket_.get_executor(), std::forward<Token>(callback));
}

void Connect(std::string Host, std::string Port,
const ConnectCallback& connect_callback,
const ErrorCallback& error_callback) {
Post([=, self = shared_from_this()] {
self->do_resolve(Host, Port, connect_callback, error_callback);
});
}

template <typename Handler>
void Await(boost::posix_time::time_duration ms, Handler f) {
Post([this, self = shared_from_this(), ms, f] {
timeout_.expires_from_now(ms);
timeout_.template async_wait(
[self, f = std::move(f)](error_code ec) {
if (!ec) {
asio::dispatch(std::move(f));
}
});
});
}

void Disconnect() {
Post([this, self = shared_from_this()] {
timeout_.cancel();
resolver_.cancel();
if (socket_.is_open()) {
socket_.cancel();
}
});
}

protected:
void stop_await() { timeout_.cancel(); }

void do_resolve(std::string const& host, std::string const& port,
ConnectCallback connect_callback,
ErrorCallback error_callback) {
resolver_.async_resolve(
host, port,
[=, this, self = shared_from_this()](
error_code ec, endpoint_iter_type endpoints) {
stop_await();
if (!ec) {
endpoint_iter_ = std::move(endpoints);
if constexpr (is_datagram) {
socket_.open(endpoint_iter_->endpoint().protocol());
connect_callback();
} else {
do_connect(endpoint_iter_, connect_callback,
error_callback);
}
} else {
error_callback("Unable to resolve host: " + ec.message());
}
});
}

void do_connect(endpoint_iter_type endpoints,
ConnectCallback connect_callback,
ErrorCallback error_callback) {
async_connect( //
socket_, std::move(endpoints),
[=, this, self = shared_from_this()](error_code ec, endpoint_iter_type) {
stop_await();
if (!ec) {
connect_callback();
} else {
error_callback("Unable to connect host: " + ec.message());
}
});
}

resolver_type resolver_;
endpoint_iter_type endpoint_iter_;
socket_type socket_;
asio::deadline_timer timeout_;
};

using StrandEx = asio::strand<asio::io_context::executor_type>;

struct TCPSocket : SocketImpl<asio::ip::tcp, StrandEx> {
using base_type::base_type;

void Send(Data msg, WriteCallback write_callback, ErrorCallback error_callback) {
Post([=, this, self = shared_from_this()] {
async_write(socket_, asio::buffer(msg),
[self, write_callback,
error_callback](error_code ec, size_t xfr) {
if (!ec) {
write_callback(xfr);
} else {
error_callback(ec.message());
}
});
});
}

void Read(size_t size, ReadCallback read_callback, ErrorCallback error_callback) {
Post([=, this, self = shared_from_this()] {
async_read(
socket_, asio::buffer(buff_.prepare(size)),
[this, self, read_callback, error_callback](error_code ec,
size_t length) {
stop_await();
if (!ec) {
auto data =
asio::buffer_cast<const uint8_t*>(buff_.data());
read_callback({data, length});
} else {
error_callback(ec.message());
}
buff_.consume(length);
});
});
}

void ReadUntil(std::string until_str, const ReadCallback &read_callback, const ErrorCallback &error_callback) {
Post([=, this, self = shared_from_this()] {
async_read_until(
socket_, buff_, until_str,
[=, this](error_code ec, size_t xfr) {
stop_await();
if (!ec) {
auto data =
asio::buffer_cast<const uint8_t*>(buff_.data());
read_callback({data, xfr});
} else {
error_callback(ec.message());
}
buff_.consume(xfr);
});
});
}

protected:

asio::streambuf buff_;
};


Related Topics



Leave a reply



Submit