Cancel Async_Read Due to Timeout

Cancel async_read due to timeout

The main difference between io_service::run_one() and io_service::poll_one() is that run_one() will block until a handler is ready to run, whereas poll_one() will not wait for any outstanding handlers to become ready.

Assuming the only outstanding handlers on _io_service are handle_timeout() and handle_read(), then run_one() does not require a loop because it will only return once either handle_timeout() or handle_read() have ran. On the other hand, poll_one() requires a loop because poll_one() will return immediately, as neither handle_timeout() nor handle_read() are ready to run, causing the function to eventually return.

The main issue with the original code, as well as the fix proposal #1, is that there are still outstanding handlers in the io_service when async_read_helper() returns. Upon the next call to async_read_helper(), the next handler to be invoked will be a handler from the previous call. The io_service::reset() method only allows the io_service to resume running from a stopped state, it does not remove any handlers already queued into the io_service. To account for this behavior, try using a loop to consume all of the handlers from the io_service. Once all handlers have been consumed, exit the loop and reset the io_service:

// Consume all handlers.
while (_io_service->run_one())
{
if (_message_received)
{
// Message received, so cancel the timer. This will force the completion of
// handle_timer, with boost::asio::error::operation_aborted as the error.
timer.cancel();
}
else if (_timeout_triggered)
{
// Timeout occured, so cancel the socket. This will force the completion of
// handle_read, with boost::asio::error::operation_aborted as the error.
_socket->cancel();
}
}

// Reset service, guaranteeing it is in a good state for subsequent runs.
_io_service->reset();

From the caller's perspective, this form of timeout is synchronous as run_one() blocks. However, work is still being made within the I/O service. An alternative is to use Boost.Asio's support for C++ futures to wait on a future and perform a timeout. This code can be easier to read, but it requires at least one other thread to be processing the I/O service, as the thread waiting on the timeout is no longer processing the I/O service:

// Use an asynchronous operation so that it can be cancelled on timeout.
std::future<std::size_t> read_result = boost::asio::async_read(
socket, buffer, boost::asio::use_future);

// If timeout occurs, then cancel the operation.
if (read_result.wait_for(std::chrono::seconds(1)) ==
std::future_status::timeout)
{
socket.cancel();
}
// Otherwise, the operation completed (with success or error).
else
{
// If the operation failed, then on_read.get() will throw a
// boost::system::system_error.
auto bytes_transferred = read_result.get();
// process buffer
}

boost:asio::read or boost:asio::async_read with timeout

OK, something like this should suit your purpose:

Rationale:

You appear to want to use blocking operations. Since that is the case, there is a good chance that you're not running a thread to pump the io loop.

So we kick off two simultaneous async tasks on the socket's io loop and then spawn a thread to:

a) reset (actually restart) the loop in case it's already been exhausted

b) run the loop to exhaustion (we could be cleverer here and only run it until the handler has indicated that some condition has been met, but that's a lesson for another day)

#include <type_traits>

template<class Stream, class ConstBufferSequence, class Handler>
auto async_read_with_timeout(Stream& stream, ConstBufferSequence&& sequence, std::size_t millis, Handler&& handler)
{
using handler_type = std::decay_t<Handler>;
using buffer_sequence_type = std::decay_t<ConstBufferSequence>;
using stream_type = Stream;

struct state_machine : std::enable_shared_from_this<state_machine>
{
state_machine(stream_type& stream, buffer_sequence_type sequence, handler_type handler)
: stream_(stream)
, sequence_(std::move(sequence))
, handler_(std::move(handler))
{}
void start(std::size_t millis)
{
timer_.expires_from_now(boost::posix_time::milliseconds(millis));
timer_.async_wait(strand_.wrap([self = this->shared_from_this()](auto&& ec) {
self->handle_timeout(ec);
}));
boost::asio::async_read(stream_, sequence_,
strand_.wrap([self = this->shared_from_this()](auto&& ec, auto size){
self->handle_read(ec, size);
}));
}

void handle_timeout(boost::system::error_code const& ec)
{
if (not ec and not completed_)
{
boost::system::error_code sink;
stream_.cancel(sink);
}
}

void handle_read(boost::system::error_code const& ec, std::size_t size)
{
assert(not completed_);
boost::system::error_code sink;
timer_.cancel(sink);
completed_ = true;
handler_(ec, size);
}

stream_type& stream_;
buffer_sequence_type sequence_;
handler_type handler_;
boost::asio::io_service::strand strand_ { stream_.get_io_service() };
boost::asio::deadline_timer timer_ { stream_.get_io_service() };
bool completed_ = false;
};

auto psm = std::make_shared<state_machine>(stream,
std::forward<ConstBufferSequence>(sequence),
std::forward<Handler>(handler));
psm->start(millis);
}

std::size_t ReadData(boost::asio::ip::tcp::socket& socket,
std::vector<unsigned char> & buffer,
unsigned int size_to_read,
boost::system::error_code& ec) {

buffer.resize(size_to_read);

ec.clear();
std::size_t bytes_read = 0;
auto& executor = socket.get_io_service();
async_read_with_timeout(socket, boost::asio::buffer(buffer),
2000, // 2 seconds for example
[&](auto&& err, auto size){
ec = err;
bytes_read = size;
});

// todo: use a more scalable executor than spawning threads
auto future = std::async(std::launch::async, [&] {
if (executor.stopped()) {
executor.reset();
}
executor.run();
});
future.wait();

return bytes_read;
}

how to create boost::async_read and async_write with timeout

You can add a deadline timer that cancels the IO operation. You can observe the cancellation because the completion will be called with error::operation_aborted.

deadline_.expires_from_now(1s);
deadline_.async_wait([self, this] (error_code ec) {
if (!ec) socket_.cancel();
});

I spent about 45 minutes making the rest of your code self-contained:

  • in this example I'll assume that we
    • want to wait for max 5s for a new header to arrive (so after a new session was started or until the next request arrives on the same session)
    • after which the fullbody must be received within 1s

Note also that we avoid closing the socket - that's done in the session's destructor. It's better to shutdown gracefully.

Live Demo

#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iomanip>
#include <iostream>

using namespace std::chrono_literals;
namespace bip = boost::interprocess;
using boost::asio::ip::tcp;
using boost::system::error_code;
using Queue = boost::interprocess::message_queue;

static constexpr auto MAX_MESG_LEN = 100;
static constexpr auto MAX_MESGS = 10;

struct Message {
using Len = boost::endian::big_uint32_t;

struct header_t {
Len len;
};
static const auto header_length = sizeof(header_t);
std::array<char, MAX_MESG_LEN + header_length> buf;

char const* data() const { return buf.data(); }
char* data() { return buf.data(); }
char const* body() const { return data() + header_length; }
char* body() { return data() + header_length; }

static_assert(std::is_standard_layout_v<header_t> and
std::is_trivial_v<header_t>);

Len body_length() const { return std::min(h().len, max_body_length()); }
Len max_body_length() const { return buf.max_size() - header_length; }
bool decode_header() { return h().len <= max_body_length(); }

bool set_body(std::string_view value) {
assert(value.length() <= max_body_length());
h().len = value.length();
std::copy_n(value.begin(), body_length(), body());

return (value.length() == body_length()); // not truncated
}

private:
header_t& h() { return *reinterpret_cast<header_t*>(data()); }
header_t const& h() const { return *reinterpret_cast<header_t const*>(data()); }
};

struct Session : std::enable_shared_from_this<Session> {
Session(tcp::socket&& s) : socket_(std::move(s)) {}

void start() {
post(strand_,
[ this, self = shared_from_this() ] { do_read_header(); });
}

private:
using Strand = boost::asio::strand<tcp::socket::executor_type>;
using Timer = boost::asio::steady_timer;

tcp::socket socket_{strand_};
Strand strand_{make_strand(socket_.get_executor())};
Message res;
Queue request_queue_{bip::open_or_create, "SendQueue", MAX_MESGS, MAX_MESG_LEN};
Timer recv_deadline_{strand_};

void do_read_header() {
auto self(shared_from_this());
std::cout << "do_read_header: " << res.header_length << std::endl;
recv_deadline_.expires_from_now(5s);
recv_deadline_.async_wait([ self, this ](error_code ec) {
if (!ec) {
std::cerr << "header timeout" << std::endl;
socket_.cancel();
}
});

boost::asio::async_read(
socket_, boost::asio::buffer(res.data(), res.header_length),
[ this, self ](error_code ec, size_t /*length*/) {
std::cerr << "header: " << ec.message() << std::endl;
recv_deadline_.cancel();
if (!ec && res.decode_header()) {
do_read_body();
} else {
socket_.shutdown(tcp::socket::shutdown_both);
}
});
}

void do_read_body() {
auto self(shared_from_this());
// Message msg;
std::cout << "do_read_body: " << res.body_length() << std::endl;

recv_deadline_.expires_from_now(1s);
recv_deadline_.async_wait([self, this] (error_code ec) {
if (!ec) {
std::cerr << "body timeout" << std::endl;
socket_.cancel();
}
});

boost::asio::async_read(
socket_,
boost::asio::buffer(res.body(), res.body_length()),
boost::asio::transfer_exactly(res.body_length()),
[ this, self ](error_code ec, std::size_t length) {
std::cerr << "body: " << ec.message() << std::endl;
recv_deadline_.cancel();
if (!ec) {
try {
// Not safe to print unless NUL-terminated, see e.g.
// https://stackoverflow.com/questions/66278813/boost-deadline-timer-causes-stack-buffer-overflow/66279497#66279497
if (length)
request_queue_.send(res.body(), res.body_length(), 0);
} catch (const std::exception& ex) {
std::cout << ex.what() << std::endl;
}
do_read_header();
} else {
socket_.shutdown(tcp::socket::shutdown_both);
}
});
}
};

class Server {
public:
Server(boost::asio::io_service& io_service, short port)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
socket_(io_service) {
do_accept();
}

private:
void do_accept() {
acceptor_.async_accept(socket_, [ this ](error_code ec) {
std::cerr << "async_accept: " << ec.message() << std::endl;
if (!ec) {
std::cerr << "session: " << socket_.remote_endpoint() << std::endl;
std::make_shared<Session>(std::move(socket_))->start();
}

do_accept();
});
}

tcp::acceptor acceptor_;
tcp::socket socket_;
};

int main(int argc, char**) {
Queue queue{bip::open_or_create, "SendQueue", MAX_MESGS, MAX_MESG_LEN}; // ensure it exists

if (argc == 1) {
boost::asio::io_context ioc;
Server s(ioc, 8989);

ioc.run_for(10s);
} else {
while (true) {
using Buf = std::array<char, MAX_MESG_LEN>;
Buf buf;
unsigned prio;
size_t n;
queue.receive(buf.data(), buf.size(), n, prio);

std::cout << "Received: " << std::quoted(std::string_view(buf.data(), n)) << std::endl;
}
}
}

Testable with

./sotest

In another terminal:

./sotest consumer

And somewhere else e.g. some requests that don't timeout:

for msg in '0000 0000' '0000 0001 31' '0000 000c 6865 6c6c 6f20 776f 726c 640a'
do
xxd -r -p <<< "$msg" |
netcat localhost 8989 -w 1
done

Or, multi-request on single session, then session times out (-w 6 exceeds 5s):

msg='0000 0000 0000 0001 31 0000 000c 6865 6c6c 6f20 776f 726c 640a'; xxd -r -p <<< "$msg"| netcat localhost 8989 -w 6

Sample Image

Generic way to timeout async operations in boost::asio

boost::asio::basic_deadline_timer::expires_at is for that, and the examples:
A collection of examples showing how to cancel long running asynchronous operations after a period of time.

Implements timeout using boost::asio::async_read without call run on io_service

The problem in the above implementation is that the async_reads which times out is never canceled. Here is how to do this:

    while(1) {
io.reset();
io.poll_one(ec);

if ( read_result ) {
timer.cancel(); // cancel the timeout operation as it has not completed yet
return;

} else if ( timer_result ) {
stream.cancel(); // cancel the read operation as it has not completed yet
throw std::runtime_error("timeout");
}
}

Asynchronous reading from boost::asio socket with timeout

Your write function has big problems. You're initiating an async_write on a temporary (local) request (You even access request from the completion handler). This leads to undefined behaviour.

Your time callback looks fine, except it could be more specific about the error code. Most likely "spurious wake-up" would refer to completion with error::operation_aborted when the timer is canceled. This will also happen when you reset it (with expires_from_now).

I would probably write it more like

timer.expires_from_now(4000ms);
timer.async_wait([this](error_code ec) {
if (ec != boost::asio::error::operation_aborted)
return;

std::cout << "Socket closed by timer" << std::endl;
socket.cancel();
});

Note the cancel() which should suffice.

Here's a redo using some simplifications and good practices:

Live On Coliru

//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 1
#include <boost/asio.hpp>
#include <iostream>
using boost::asio::ip::tcp;
using boost::system::error_code;

using std::this_thread::sleep_for;
using namespace std::chrono_literals;

class Connection {
public:
template <typename Executor>
Connection(Executor ex) : timer(ex)
, socket(ex) {
buf.resize(10000);
}

void connect(const std::string& ip, uint16_t port) {
std::cout << "Connecting" << std::endl;
auto addr = boost::asio::ip::address::from_string(ip);
socket.connect({addr, port});
std::cout << "Connected to " << socket.remote_endpoint() << std::endl;
}

void write() {
std::cout << "Trying to write..." << std::endl;

async_write( //
socket, boost::asio::buffer(request),
// boost::asio::transfer_all(),
[this](error_code ec, size_t transferred) {
std::cout << "request size is " << request.size() << "\n"
<< "written size is " << transferred << " ("
<< ec.message() << ")" << std::endl;
});
}

void read() {
timer.expires_from_now(4000ms);
timer.async_wait([this](error_code ec) {
if (ec != boost::asio::error::operation_aborted)
return;

socket.cancel();
});

async_read( //
socket, boost::asio::buffer(buf), boost::asio::transfer_at_least(1),
[this](error_code ec, size_t transferred) {
if (ec == boost::asio::error::operation_aborted) {
std::cout << "Socket closed by timer" << std::endl;
return;
}

std::cout << "read callback (" << ec.message() << ", "
<< transferred << " bytes)" << std::endl;
sleep_for(1s);
//std::cout << buf.substr(0, transferred);
if (!ec)
read();
});
}

private:
std::string request = "GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n";
std::string buf;
boost::asio::steady_timer timer;
tcp::socket socket;
};

int main() {
boost::asio::io_context ctx;

// strand not required unless multi-threading:
Connection connection(make_strand(ctx.get_executor()));
sleep_for(2s);

connection.connect("93.184.216.34", 80);
connection.write();
connection.read();

//auto work = make_work_guard(ctx); // not required
std::cout << "Waiting for read..." << std::endl;
ctx.run();
}

Prints

Connecting
Connected to 93.184.216.34:80
Trying to write...
Waiting for read...
request size is 41
written size is 41 (Success)
read callback (Success, 1591 bytes)
Socket closed by timer

Now, I wouldn't bother with transfer_at_least and friends. If you want, use async_read_until with a completion condition or boundary like "\r\n\r\n". I would probably also just receive into a dynamic buffer:

async_read_until( //
socket, boost::asio::dynamic_buffer(buf), "\r\n\r\n",
[this](error_code ec, size_t transferred) {
if (ec == boost::asio::error::operation_aborted) {
std::cout << "Socket closed by timer" << std::endl;
return;
}

std::cout << "read callback (" << ec.message() << ", "
<< transferred << " bytes)\n"
<< "Headers only: "
<< std::string_view(buf).substr(0, transferred)
<< "Remaining in buffer (start of body): "
<< buf.size() - transferred << std::endl;
});

Now you can expect to see: Live

Connecting
Connected to 93.184.216.34:80
Trying to write...
Waiting for read...
request size is 41
written size is 41 (Success)
read callback (Success, 335 bytes)
Headers only: HTTP/1.1 200 OK
Age: 550281
Cache-Control: max-age=604800
Content-Type: text/html; charset=UTF-8
Date: Sat, 13 Aug 2022 11:52:28 GMT
Etag: "3147526947+ident"
Expires: Sat, 20 Aug 2022 11:52:28 GMT
Last-Modified: Thu, 17 Oct 2019 07:18:26 GMT
Server: ECS (chb/0286)
Vary: Accept-Encoding
X-Cache: HIT
Content-Length: 1256
Remaining in buffer (start of body): 177

Bonus: Use a Library!

In reality I'd be using Boost Beast: Live On Coliru

#include <boost/beast.hpp>
#include <iostream>
using boost::asio::ip::tcp;
using namespace std::chrono_literals;

namespace beast = boost::beast;
namespace http = beast::http;

int main() {
boost::asio::io_context ctx;
beast::tcp_stream conn(ctx.get_executor());
conn.connect({boost::asio::ip::address::from_string("93.184.216.34"), 80});
conn.expires_after(1000ms);

auto req = http::request<http::empty_body>(http::verb::get, "/", 11);
req.set(http::field::host, "www.example.com");
write(conn, req);

conn.expires_after(4000ms);

http::response<http::string_body> res;
beast::flat_buffer buf;
read(conn, buf, res);

std::cout << "Headers: " << res.base() << "\n";
std::cout << "Body received: " << res.body().length() << "\n";
}

Prints

Headers: HTTP/1.1 200 OK
Age: 551081
Cache-Control: max-age=604800
Content-Type: text/html; charset=UTF-8
Date: Sat, 13 Aug 2022 12:05:48 GMT
Etag: "3147526947+ident"
Expires: Sat, 20 Aug 2022 12:05:48 GMT
Last-Modified: Thu, 17 Oct 2019 07:18:26 GMT
Server: ECS (chb/0286)
Vary: Accept-Encoding
X-Cache: HIT
Content-Length: 1256

Body received: 1256


Related Topics



Leave a reply



Submit