How to Execute Two Threads Asynchronously Using Boost

How can I execute two threads asynchronously using boost?

This is my minimal Boost threading example.

#include <boost/thread.hpp>
#include <iostream>

using namespace std;

void ThreadFunction()
{
int counter = 0;

for(;;)
{
cout << "thread iteration " << ++counter << " Press Enter to stop" << endl;

try
{
// Sleep and check for interrupt.
// To check for interrupt without sleep,
// use boost::this_thread::interruption_point()
// which also throws boost::thread_interrupted
boost::this_thread::sleep(boost::posix_time::milliseconds(500));
}
catch(boost::thread_interrupted&)
{
cout << "Thread is stopped" << endl;
return;
}
}
}

int main()
{
// Start thread
boost::thread t(&ThreadFunction);

// Wait for Enter
char ch;
cin.get(ch);

// Ask thread to stop
t.interrupt();

// Join - wait when thread actually exits
t.join();
cout << "main: thread ended" << endl;

return 0;
}

using boost async API's with multiple threads

In the official chat example, chat_client::write() defers work to the io_service via io_service::post(), which will:

  • request that the io_service execute the given handler via a thread that is currently invoking the poll(), poll_one(), run(), or run_one() function on the io_service
  • not allow the given handler to be invoked within the calling function (e.g. chat_client::write())

As only one thread is running the io_service, and all socket read, write, and close operations are only initiated from handlers that have been posted to the io_service, the program satisfies the thread-safety requirement for socket.

class chat_client
{
void write(const chat_message& msg)
{
// The nullary function `handler` is created, but not invoked within
// the calling function. `msg` is captured by value, allowing `handler`
// to append a valid `msg` object to `write_msgs_`.
auto handler = [this, msg]()
{
bool write_in_progress = !write_msgs_.empty();
write_msgs_.push_back(msg);
if (!write_in_progress)
{
do_write();
}
};

// Request that `handler` be invoked within the `io_service`.
io_service_.post(handler);
}
};

Boost:asio and async in multi-threading

Wow. This overcomplicating on several levels.

  • futures can have typed return values (that's actually the whole point of a future over synchronization primitives)

  • the fturue can signal the readiness with a value, no need to duplicate the readiness into a bool and then copy the result somewhere

  • this has me confused:

     int get()
    {
    _fut.wait();
    return _result.load();
    }

    It awaits the future, then returns the _result, you know what you invented _ready for?

  • Do you realize that std::async is not part of Boost ASIO? In fact, it doesn't work well with it because, as you correctly notice, it introduces (unspecified numbers of) threads. In general my advice is not to use std::async (it's hard to use correctly) and certainly never when using ASIO

  • When you see the same variables name var1, var2, var3 it's time to refactor your code (into functions or classes if it includes data members):

     std::deque<io::steady_timer> timers;

    for (int i = 1; i <= 3; ++i) {
    auto& timer = timers.emplace_back(context, std::chrono::seconds(1+i));
    timer.async_wait([i](error_code ec) {
    std::cout << "timer " << i
    << ", thread name: " << std::this_thread::get_id()
    << std::endl;
    });
    }
  • instead of a vector of threads, consider boost::thread_group or indeed boost::asio::thread_pool.

  • If you manually run the IO threads, remember to handle exceptions (Should the exception thrown by boost::asio::io_service::run() be caught?), so

     boost::thread_group threads;
    for (int n = 0; n < 2; ++n) {
    threads.create_thread([&] { context.run(); });
    }

    threads.join_all();

    Or indeed

     io::thread_pool context(2);
    context.join();
  • This is very inefficient

     while (!_ready) {
    std::this_thread::yield();
    }

    Just set the future value to signify it's ready:

  • using namespace std is generally not a good idea (Why is "using namespace std;" considered bad practice?)

Demo

Here's my expanded but simplified take on the question code:

Live On Coliru

#include <boost/asio.hpp>
#include <deque>
#include <future>
#include <iostream>
#include <thread>
namespace io = boost::asio;
using namespace std::chrono_literals;
using boost::system::error_code;

// not very useful in practice, but for debug output in main
std::ostream& debug(error_code);
template <typename Fut> bool is_ready(Fut const& fut) {
return fut.wait_for(0s) == std::future_status::ready;
}

int main() {
std::promise<int> reply;
std::shared_future got_value = reply.get_future();

io::thread_pool context(2);
std::deque<io::steady_timer> timers;

for (int i = 1; i <= 10; ++i) {
timers //
.emplace_back(context, i * 1s)
.async_wait([&got_value](error_code ec) {
if (is_ready(got_value))
debug(ec) << " Reply:" << got_value.get() << std::endl;
else
debug(ec) << " (reply not ready)" << std::endl;
});
}

timers //
.emplace_back(context, 4'500ms)
.async_wait([&reply](error_code ec) {
debug(ec) << " setting value" << std::endl;
reply.set_value(1337);
});

context.join();
}

int friendly_thread_id() {
return std::hash<std::thread::id>{}(std::this_thread::get_id()) % 256;
}

#include <iomanip>
std::ostream& debug(error_code ec) {
auto now = std::chrono::system_clock::now;
static auto program_start = now();
return std::cout //
<< ((now() - program_start) / 1ms) << "ms\t"
<< "thread:" << std::hex << std::setfill('0') << std::showbase
<< std::setw(2) << friendly_thread_id() << std::dec << " ";
}

#include <iomanip>
std::ostream& debug(error_code ec) {
auto now = std::chrono::system_clock::now;
static auto program_start = now();
return std::cout //
<< ((now() - program_start) / 1ms) << "ms\t"
<< "thread:" << std::hex << std::setfill('0') << std::showbase
<< std::setw(2) << pretty_thread_id() << std::dec << " ";
}

Prints

0ms     thread:0x5f  (reply not ready)
999ms thread:0xf3 (reply not ready)
1999ms thread:0x5f (reply not ready)
2999ms thread:0x5f (reply not ready)
3499ms thread:0xf3 setting value
3999ms thread:0x5f Reply:1337
4999ms thread:0xf3 Reply:1337
5999ms thread:0xf3 Reply:1337
6999ms thread:0xf3 Reply:1337
7999ms thread:0xf3 Reply:1337
8999ms thread:0xf3 Reply:1337

Boost asio strand and io_service running on several threads

In short, a strand guarantees sequential invocation of its own handlers, but makes no guarantee of concurrent execution of handlers from different strands. Thus, the answers to the points are:

  1. Yes. Sequential invocation is guaranteed.
  2. Yes. Concurrent execution could happen, but there is no guarantee.
  3. Yes to sequential invocation, but no to the the guarantee that concurrent execution will occur.

A strand maintains its own handler queue, and guarantees that only one of its handlers is in the io_service, resulting in handlers being synchronized before being placed into the io_service. Thus, all handlers posted or dispatched through a strand will be executed sequentially.

Concurrent execution of handlers posted or dispatched through different strands can occur, it is just not guaranteed to occur. The documentation states:

The implementation makes no guarantee that handlers posted or dispatched through different strand objects will be invoked concurrently.

Therefore, if Thread1 is executing a handler posted through Strand_1, Boost.Asio will not use that information to guarantee that a handler posted through Strand_2 will be executed by Thread2; however, it is possible that Thread2 is selected to execute the handler from Strand_2 based on other implementation details, such as being the next available thread in the list of threads running the io_service.

For example, consider the case where 3 handlers A, B, and C are ready to run within the io_service:

  • A was posted posted through Strand_1.
  • B was not posted through a strand.
  • C was posted through Strand_2.

If Thread1 and Thread2 are running the io_service, then one possible execution order is:

Thread1         | Thread2
----------------+----------------
start A() | start B()
`-- finish A() | |
start C() | `-- finish B()
`-- finish C() |

The illustrated execution order shows that that handlers (A and C) posted through different strands (Strand_1 and Strand_2 respectively) are not guaranteed to be executed concurrently.

Boost::ASIO - how to dedicate 2 threads to handle receiving and sending messages

You should use a single io_service, however many threads you use to invoke io_service::run() can also invoke handlers for asynchronous operations owned by the io_service. If these handlers access shared data structures, you will need to use a strand to ensure exclusive access. You'll also need to ensure at most one write operation

This operation is implemented in terms of zero or more calls to the
stream's async_write_some function, and is known as a composed
operation
. The program must ensure that the stream performs no other
write operations (such as async_write, the stream's async_write_some
function, or any other composed operations that perform writes) until
this operation completes.

and read operation

This operation is implemented in terms of zero or more calls to the
stream's async_read_some function, and is known as a composed
operation
. The program must ensure that the stream performs no other
read operations (such as async_read, the stream's async_read_some
function, or any other composed operations that perform reads) until
this operation completes.

is outstanding for each socket.

Using an io_service for all async_write() operations and another io_service for all async_read() operations is not possible because a single socket is serviced by one io_service that is passed in as a parameter in the constructor.

In my experience most mult-io_service designs are driven by performance and latency requirements. The HTTP Server 2 example explores this with an io_service per CPU.

Sending http request from multiple threads using boost::asio. How to handle responses serially

You are not actually reading asynchronously, therefore you're not actually synchronizing much with the strand. The only thing that is being synchronized is the access to the stream_/socket.

Now, doing everything synchronously is good thinking. In that case I'd suggest you don't need any threads, and therefore, no strand to begin with.

Once you do have the strand/thread(s) doing non-trivial operations risks blocking the service thread(s). Consider when the webserver takes a second to respond. That's taking ages in computer terms.

If you are doing as many requests simultaneously as there are threads (which could typically be low, e.g. 4) then nothing else can progress on the io service, thus negating the very purpose of ASIO: asynchronous I/O.


Let me quickly pave over some minor issues in your question code, making it self-contained: Live On Coliru

#include <boost/beast/http.hpp>
#include <boost/beast.hpp>
#include <boost/asio.hpp>
#include <iostream>
using boost::asio::ip::tcp;
namespace beast = boost::beast;
namespace http = beast::http;

using Context = boost::asio::io_context;
using Strand = boost::asio::strand<Context::executor_type>;

struct Demo {
using Request = http::request<http::string_body>;

Demo(Context& ctx, tcp::endpoint ep) //
: strand_(ctx.get_executor())
{
stream_.connect(ep);
}

void Send(Request const& req)
{
post(strand_, [=,this]() {
// prepare request ...
http::write(stream_, req);
//...
http::response<boost::beast::http::dynamic_body> res;
beast::flat_buffer buffer;
beast::error_code ec;
http::read(stream_, buffer, res, ec);

std::cout << res << "\n";
});
}

private:
Strand strand_;
tcp::socket stream_{strand_};
};

int main() {
Context io;
Demo x(io, {{}, 80});

Demo::Request req{http::verb::get, "/", 10};
req.prepare_payload();
x.Send(req);

io.run();
}

Improving

I'd suggest an asynchronous interface that is safe to use. I.e. you cannot be sure a new request won't be started on the same socket before the previous one(s) have been completed, so you need a queue:

void Send(Request req) {
post(strand_, [this, req = std::move(req)]() mutable {
_outgoing.push_back(std::move(req));

if (_outgoing.size() == 1) // no pending
ServiceRequestQueue();
});
}

Now, all the logic you had is moved into the request loop, but async:

void ServiceRequestQueue()
{
http::async_write( //
stream_, _outgoing.front(), [this](beast::error_code ec, size_t) {
if (ec) {
std::cerr << "Request cannot be sent: " << ec.message() << std::endl;
return;
}

// receive response
_incoming.clear();
_incoming.body().clear();

http::async_read( //
stream_, buffer, _incoming,
[this](beast::error_code ec, size_t) {
if (ec) {
std::cerr << "Response cannot be received: "
<< ec.message() << std::endl;
return;
}
// std::cout << _incoming.base() << "\n";
std::cout << stream_.remote_endpoint() << " "
<< _incoming.result() << " "
<< _incoming.body().size() << "\n";

// request done
_outgoing.pop_front();
// continue if more queued
if (not _outgoing.empty())
ServiceRequestQueue();
});
});
}

You might want to split some of the completion handlers into separate functions, or do something useful with the request.

Live On Coliru

int main() {
Context io;

Demo example_com { io, "93.184.216.34", 80 } ;
Demo coliru { io, "173.203.57.63", 80 } ;
Demo localhost { io, "127.0.0.1", 80 } ;

// queue many requests before service start
auto queue10 = [](Demo& client, std::string hostname, int version) {
Demo::Request req{http::verb::get, "/", 11};
req.set(http::field::host,hostname);
req.prepare_payload();

for (int i = 0; i < 10; ++i)
client.Send(req);
};

queue10(example_com, "www.example.com", 11);
queue10(coliru, "coliru-stacked-crooked.com", 11);
queue10(localhost, "sehe.nl", 10);

// start service
io.run();
}

Prints, on my system:

127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
93.184.216.34:80 OK 1256
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
93.184.216.34:80 OK 1256
173.203.57.63:80 OK 8616
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
173.203.57.63:80 OK 8616
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
173.203.57.63:80 OK 8616
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616

Note that if you create mamy requests simultaneously (e.g. even before running the io_context at all), you can observe that separate HTTP clients work in overlapping fashion.

Advanced

If you really wanted a function that initiates a request and allows you to consume the response in the completion handler, consider extending your interface like this:

template <typename Token>
void async_send(Request req, Token&& token) {
using result_type = typename boost::asio::async_result<
std::decay_t<Token>, void(beast::error_code, Response)>;
using handler_type = typename result_type::completion_handler_type;
handler_type handler(std::forward<Token>(token));
result_type result(handler);

struct Op {
Request req;
Response res;
handler_type handler;

Op(Request&& r, handler_type&& h)
: req(std::move(r))
, handler(std::move(h))
{
}

bool check(beast::error_code ec, bool force_completion = false) {
if (ec || force_completion)
std::move(handler)(ec, std::move(res));
return !ec.failed();
}
};

auto op = std::make_shared<Op>(std::move(req), std::move(handler));

post(strand_, [this, op] {
http::async_write( //
stream_, op->req,
[this, op](beast::error_code ec, size_t) mutable {
if (op->check(ec))
http::async_read(stream_, buffer, op->res,
[op](beast::error_code ec, size_t) {
op->check(ec, true);
});
});
});

return result.get();
}

Note this moves the responsibility to avoid overlapping requests per client back to the caller. So starting some request chains like

// queue several request chains before service start
AsyncRequestChain(10, example_com, "www.example.com");
AsyncRequestChain(10, coliru, "coliru.stacked-crooked.com");
AsyncRequestChain(10, localhost, "sehe.nl");

// start service
io.run();

With the chain itself being:

void AsyncRequestChain(unsigned n, Demo& client, std::string hostname)
{
if (!n)
return;
Demo::Request req{http::verb::get, "/", 11};
req.set(http::field::host, hostname);
req.prepare_payload();

client.async_send( //
req, [=, &client](beast::error_code ec, Demo::Response&& res) {
std::cout << hostname << ": " << ec.message();
if (!ec)
std::cout << " " << res.result() //
<< " " << res.body().size();
std::cout << std::endl;

// continue with next iteration
AsyncRequestChain(n - 1, client, hostname);
});
}

Prints, on my machine:

sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
www.example.com: Success OK 1256
www.example.com: Success OK 1256
coliru.stacked-crooked.com: Success OK 8616
www.example.com: Success OK 1256
www.example.com: Success OK 1256
www.example.com: Success OK 1256
coliru.stacked-crooked.com: Success OK 8616
www.example.com: Success OK 1256
www.example.com: Success OK 1256
coliru.stacked-crooked.com: Success OK 8616
www.example.com: Success OK 1256
www.example.com: Success OK 1256
www.example.com: Success OK 1256
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616

See it Live On Coliru



Related Topics



Leave a reply



Submit