How to Set a Timeout on Blocking Sockets in Boost Asio

How to set a timeout on blocking sockets in boost asio?

Under Linux/BSD the timeout on I/O operations on sockets is directly supported by the operating system. The option can be enabled via setsocktopt(). I don't know if boost::asio provides a method for setting it or exposes the socket scriptor to allow you to directly set it -- the latter case is not really portable.

For a sake of completeness here's the description from the man page:

SO_RCVTIMEO and SO_SNDTIMEO

          Specify the receiving or sending  timeouts  until  reporting  an
error. The argument is a struct timeval. If an input or output
function blocks for this period of time, and data has been sent
or received, the return value of that function will be the
amount of data transferred; if no data has been transferred and
the timeout has been reached then -1 is returned with errno set
to EAGAIN or EWOULDBLOCK just as if the socket was specified to
be non-blocking. If the timeout is set to zero (the default)
then the operation will never timeout. Timeouts only have
effect for system calls that perform socket I/O (e.g., read(2),
recvmsg(2), send(2), sendmsg(2)); timeouts have no effect for
select(2), poll(2), epoll_wait(2), etc.

boost asio timeout

Fist of all I believe that you should ALWAYS use the async methods since they are better and your design will only benefit from a reactor pattern approach.
In the bad case that you're in a hurry and you're kind of prototyping, the sync methods can be useful. In this case I do agree with you that without any timeout support, they cannot be used in the real world.

What I did was very simple:

void HttpClientImpl::configureSocketTimeouts(boost::asio::ip::tcp::socket& socket)
{
#if defined OS_WINDOWS
int32_t timeout = 15000;
setsockopt(socket.native(), SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof(timeout));
setsockopt(socket.native(), SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeout, sizeof(timeout));
#else
struct timeval tv;
tv.tv_sec = 15;
tv.tv_usec = 0;
setsockopt(socket.native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(socket.native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
#endif
}

The code above works both on windows and on Linux and on MAC OS, according to the OS_WINDOWS macro.

Set timeout for boost socket.connect

Have you take a look to the following example? It contains a sample code an async_connect with timeout.

The connect with timeout method could be implemented using the following code:

void connect(const std::string& host, const std::string& service,
boost::posix_time::time_duration timeout) {
// Resolve the host name and service to a list of endpoints.
tcp::resolver::query query(host, service);
tcp::resolver::iterator iter = tcp::resolver(io_service_).resolve(query);

// Set a deadline for the asynchronous operation. As a host name may
// resolve to multiple endpoints, this function uses the composed operation
// async_connect. The deadline applies to the entire operation, rather than
// individual connection attempts.
deadline_.expires_from_now(timeout);

// Set up the variable that receives the result of the asynchronous
// operation. The error code is set to would_block to signal that the
// operation is incomplete. Asio guarantees that its asynchronous
// operations will never fail with would_block, so any other value in
// ec indicates completion.
boost::system::error_code ec = boost::asio::error::would_block;

// Start the asynchronous operation itself. The boost::lambda function
// object is used as a callback and will update the ec variable when the
// operation completes. The blocking_udp_client.cpp example shows how you
// can use boost::bind rather than boost::lambda.
boost::asio::async_connect(socket_, iter, var(ec) = _1);

// Block until the asynchronous operation has completed.
do io_service_.run_one(); while (ec == boost::asio::error::would_block);

// Determine whether a connection was successfully established. The
// deadline actor may have had a chance to run and close our socket, even
// though the connect operation notionally succeeded. Therefore we must
// check whether the socket is still open before deciding if we succeeded
// or failed.
if (ec || !socket_.is_open())
throw boost::system::system_error(
ec ? ec : boost::asio::error::operation_aborted);
}

Boost Asio Timeout Approach

I'm interpreting this as basically asking how to combine

  • the time slice example I gave earlier,
  • with the multi-listener example

This is also reflected in your comment there:

I have one bottleneck. According to last code what you shared with me : [...] I tried to add the condition : record 100 ms of the data and after resume the sockets, go to process collected data. When is done, start again 100 ms to collect data from sockets and again process for 900 ms etc... The problem is that each listener now have its own current time. I am thinking how to have everything in one place, and when 100 ms is elapsed, notify all 'listeners' to resume using "stop() function provided by you".

It would seem much easier to use the same time-slice calculation I used in the first (single-listener) example.

The whole point of the way I calculated time slices was to allow for synchronization to a clock, without time-drift. The beauty of it is that it translates 1:1 on multi-listeners.

Here's the combination with 1 timer per listener but synchronized time slices, created in exactly the same way I created the multi-listener sample from the original answer code:

(just copied all the things related to read_loop into a class, and done)

Live On Coliru

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iostream>
#include <list>
#include <set>

namespace net = boost::asio;
using net::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::placeholders;

using Clock = std::chrono::steady_clock;
using Timer = net::steady_timer;
constexpr auto period = 1s;
constexpr auto armed_slice = 100ms;

struct listener {
udp::socket s;

listener(Clock::time_point start, net::any_io_executor ex, uint16_t port)
: s{ex, {{}, port}}
, start_{start} {}

void start() {
read_loop(event::init, error_code{}, -1); // prime the async pump
}

void stop() {
post(s.get_executor(), [this] {
stopped_ = true;
s.cancel();
timer.cancel();
});
}

void report() const {
std::cout << s.local_endpoint() << ": A total of " << received_packets
<< " were received from " << unique_senders.size()
<< " unique senders\n";
}

private:
std::atomic_bool stopped_{false};
enum class event { init, resume, receive };

Clock::time_point const start_;
Timer timer{s.get_executor()};
std::array<char, 100> receive_buffer;
udp::endpoint sender;

std::set<udp::endpoint> unique_senders;
size_t received_packets = 0;

void read_loop(event ev, error_code ec, [[maybe_unused]] size_t bytes) {
if (stopped_)
return;
auto const now = Clock::now();
auto const relnow = now - start_;
switch (ev) {
case event::receive:
// std::cout << s.local_endpoint() << "receive (" << ec.message()
//<< ")\n";
if (ec)
return;

if ((relnow % period) > armed_slice) {
// ignore this receive

// wait for next slice
auto next_slice = start_ + period * (relnow / period + 1);
std::cout << s.local_endpoint() << " Waiting "
<< (next_slice - now) / 1ms << "ms ("
<< received_packets << " received)\n";
timer.expires_at(next_slice);
return timer.async_wait(std::bind(&listener::read_loop, this,
event::resume, _1, 0));
} else {
received_packets += 1;
unique_senders.insert(sender);
/*
*std::cout << s.local_endpoint() << " Received:" << bytes
* << " sender:" << sender
* << " recorded:" << received_packets << "\n";
*std::cout << std::string_view(receive_buffer.data(), bytes)
* << "\n";
*/
}
break;
case event::resume:
//std::cout << "resume (" << ec.message() << ")\n";
if (ec)
return;
break;
case event::init:
//std::cout << s.local_endpoint() << " init " << (now - start_) / 1ms << "ms\n";
break;
};
s.async_receive_from(
net::buffer(receive_buffer), sender,
std::bind_front(&listener::read_loop, this, event::receive));
}
};

int main() {
net::thread_pool io(1); // single threaded

std::list<listener> listeners;

auto each = [&](auto mf) { for (auto& l : listeners) (l.*mf)(); };

auto const start = Clock::now();

for (uint16_t port : {1234, 1235, 1236})
listeners.emplace_back(start, io.get_executor(), port);

each(&listener::start);

// after 5s stop
std::this_thread::sleep_for(5s);

each(&listener::stop);

io.join();

each(&listener::report);
}

Live Demo:

Sample Image

EDIT In case the output goes too fast to interpret:

0.0.0.0:1234 Waiting 899ms (1587 received)
0.0.0.0:1236 Waiting 899ms (1966 received)
0.0.0.0:1235 Waiting 899ms (1933 received)
0.0.0.0:1235 Waiting 899ms (4054 received)
0.0.0.0:1234 Waiting 899ms (3454 received)
0.0.0.0:1236 Waiting 899ms (4245 received)
0.0.0.0:1236 Waiting 899ms (6581 received)
0.0.0.0:1235 Waiting 899ms (6257 received)
0.0.0.0:1234 Waiting 899ms (5499 received)
0.0.0.0:1235 Waiting 899ms (8535 received)
0.0.0.0:1234 Waiting 899ms (7494 received)
0.0.0.0:1236 Waiting 899ms (8811 received)
0.0.0.0:1236 Waiting 899ms (11048 received)
0.0.0.0:1234 Waiting 899ms (9397 received)
0.0.0.0:1235 Waiting 899ms (10626 received)
0.0.0.0:1234: A total of 9402 were received from 7932 unique senders
0.0.0.0:1235: A total of 10630 were received from 8877 unique senders
0.0.0.0:1236: A total of 11053 were received from 9133 unique senders

If you are sure you are remaining single threaded, you might consider using the same actual timer, at the cost of significantly increasing complexity.

C++ Boost ASIO: how to read/write with a timeout?

This has been brought up on the asio mailing lists, there's a ticket requesting the feature as well. To summarize, it is suggested to use asynchronous methods if you desire timeouts and cancellability.


If you cannot convert to asynchronous methods, you might try the SO_RCVTIMEO and SO_SNDTIMEO socket options. They can be set with setsockopt, the descriptor can be obtained with the boost::asio::ip::tcp::socket::native method. The man 7 socket man page says

SO_RCVTIMEO and SO_SNDTIMEO
Specify the receiving or sending timeouts until reporting an
error. The argument is a struct
timeval. If an input or output
function blocks for this period of
time, and data has been sent
or received, the return value of that function will be
the amount of data transferred; if no
data has been transferred and the
timeout has been reached then -1 is
returned with errno set to
EAGAIN or EWOULDBLOCK just as if the socket was specified to
be non-blocking. If the timeout is
set to zero (the default) then the
operation will never timeout.
Timeouts only have effect
for system calls that perform socket I/O (e.g., read(2),
recvmsg(2), send(2), sendmsg(2));
timeouts have no effect for select(2),
poll(2), epoll_wait(2), etc.

How to make a timeout at receiving in boost::asio udp::socket?

PS. As for "it doesn't work when debugging", debugging (specifically breakpoints) obviously changes timing. Also, keep in mind network operations have varying latency and UDP isn't a guaranteed protocol: messages may not be delivered.


Asio stands for "Asynchronous IO". As you might suspect, this means that asynchronous IO is a built-in feature, it's the entire purpose of the library. See overview/core/async.html: Concurrency Without Threads

It's not necessary to complicate with std::async. In your case I'd suggest using async_receive_from with use_future, as it is closest to the model you opted for:

Live On Coliru

#include <boost/asio.hpp>
#include <iostream>
#include <iomanip>
namespace net = boost::asio;
using net::ip::udp;

using namespace std::chrono_literals;
constexpr auto kPackageMaxSize = 65520;
using data_t = std::vector<char>;

int main() {
net::thread_pool ioc;

udp::socket socket_(ioc, udp::v4());
socket_.bind({{}, 8989});

udp::endpoint ep;
data_t buffer(kPackageMaxSize);
auto fut =
socket_.async_receive_from(net::buffer(buffer), ep, net::use_future);

switch (fut.wait_for(4ms)) {
case std::future_status::ready: {
buffer.resize(fut.get()); // never blocks here
std::cout << "Received " << buffer.size() << " bytes: "
<< std::quoted(
std::string_view(buffer.data(), buffer.size()))
<< "\n";
break;
}
case std::future_status::timeout:
case std::future_status::deferred: {
std::cout << "Timeout\n";
socket_.cancel(); // stop the IO operation
// fut.get() would throw system_error(net::error::operation_aborted)
break;
}
}

ioc.join();
}

The Coliru output:

Received 12 bytes: "Hello World
"

Locally demonstrating both timeout and successful path:

Sample Image



Related Topics



Leave a reply



Submit