How can I execute two threads asynchronously using boost?
This is my minimal Boost threading example.
#include <boost/thread.hpp>
#include <iostream>
using namespace std;
void ThreadFunction()
{
int counter = 0;
for(;;)
{
cout << "thread iteration " << ++counter << " Press Enter to stop" << endl;
try
{
// Sleep and check for interrupt.
// To check for interrupt without sleep,
// use boost::this_thread::interruption_point()
// which also throws boost::thread_interrupted
boost::this_thread::sleep(boost::posix_time::milliseconds(500));
}
catch(boost::thread_interrupted&)
{
cout << "Thread is stopped" << endl;
return;
}
}
}
int main()
{
// Start thread
boost::thread t(&ThreadFunction);
// Wait for Enter
char ch;
cin.get(ch);
// Ask thread to stop
t.interrupt();
// Join - wait when thread actually exits
t.join();
cout << "main: thread ended" << endl;
return 0;
}
using boost async API's with multiple threads
In the official chat example, chat_client::write()
defers work to the io_service
via io_service::post()
, which will:
- request that the
io_service
execute the given handler via a thread that is currently invoking thepoll()
,poll_one()
,run()
, orrun_one()
function on theio_service
- not allow the given handler to be invoked within the calling function (e.g.
chat_client::write()
)
As only one thread is running the io_service
, and all socket read, write, and close operations are only initiated from handlers that have been posted to the io_service
, the program satisfies the thread-safety requirement for socket
.
class chat_client
{
void write(const chat_message& msg)
{
// The nullary function `handler` is created, but not invoked within
// the calling function. `msg` is captured by value, allowing `handler`
// to append a valid `msg` object to `write_msgs_`.
auto handler = [this, msg]()
{
bool write_in_progress = !write_msgs_.empty();
write_msgs_.push_back(msg);
if (!write_in_progress)
{
do_write();
}
};
// Request that `handler` be invoked within the `io_service`.
io_service_.post(handler);
}
};
Boost:asio and async in multi-threading
Wow. This overcomplicating on several levels.
futures can have typed return values (that's actually the whole point of a future over synchronization primitives)
the fturue can signal the readiness with a value, no need to duplicate the readiness into a
bool
and then copy the result somewherethis has me confused:
int get()
{
_fut.wait();
return _result.load();
}It awaits the future, then returns the
_result
, you know what you invented_ready
for?Do you realize that
std::async
is not part of Boost ASIO? In fact, it doesn't work well with it because, as you correctly notice, it introduces (unspecified numbers of) threads. In general my advice is not to usestd::async
(it's hard to use correctly) and certainly never when using ASIOWhen you see the same variables name var1, var2, var3 it's time to refactor your code (into functions or classes if it includes data members):
std::deque<io::steady_timer> timers;
for (int i = 1; i <= 3; ++i) {
auto& timer = timers.emplace_back(context, std::chrono::seconds(1+i));
timer.async_wait([i](error_code ec) {
std::cout << "timer " << i
<< ", thread name: " << std::this_thread::get_id()
<< std::endl;
});
}instead of a vector of threads, consider
boost::thread_group
or indeedboost::asio::thread_pool
.If you manually run the IO threads, remember to handle exceptions (Should the exception thrown by boost::asio::io_service::run() be caught?), so
boost::thread_group threads;
for (int n = 0; n < 2; ++n) {
threads.create_thread([&] { context.run(); });
}
threads.join_all();Or indeed
io::thread_pool context(2);
context.join();This is very inefficient
while (!_ready) {
std::this_thread::yield();
}Just set the future value to signify it's ready:
using namespace std
is generally not a good idea (Why is "using namespace std;" considered bad practice?)
Demo
Here's my expanded but simplified take on the question code:
Live On Coliru
#include <boost/asio.hpp>
#include <deque>
#include <future>
#include <iostream>
#include <thread>
namespace io = boost::asio;
using namespace std::chrono_literals;
using boost::system::error_code;
// not very useful in practice, but for debug output in main
std::ostream& debug(error_code);
template <typename Fut> bool is_ready(Fut const& fut) {
return fut.wait_for(0s) == std::future_status::ready;
}
int main() {
std::promise<int> reply;
std::shared_future got_value = reply.get_future();
io::thread_pool context(2);
std::deque<io::steady_timer> timers;
for (int i = 1; i <= 10; ++i) {
timers //
.emplace_back(context, i * 1s)
.async_wait([&got_value](error_code ec) {
if (is_ready(got_value))
debug(ec) << " Reply:" << got_value.get() << std::endl;
else
debug(ec) << " (reply not ready)" << std::endl;
});
}
timers //
.emplace_back(context, 4'500ms)
.async_wait([&reply](error_code ec) {
debug(ec) << " setting value" << std::endl;
reply.set_value(1337);
});
context.join();
}
int friendly_thread_id() {
return std::hash<std::thread::id>{}(std::this_thread::get_id()) % 256;
}
#include <iomanip>
std::ostream& debug(error_code ec) {
auto now = std::chrono::system_clock::now;
static auto program_start = now();
return std::cout //
<< ((now() - program_start) / 1ms) << "ms\t"
<< "thread:" << std::hex << std::setfill('0') << std::showbase
<< std::setw(2) << friendly_thread_id() << std::dec << " ";
}
#include <iomanip>
std::ostream& debug(error_code ec) {
auto now = std::chrono::system_clock::now;
static auto program_start = now();
return std::cout //
<< ((now() - program_start) / 1ms) << "ms\t"
<< "thread:" << std::hex << std::setfill('0') << std::showbase
<< std::setw(2) << pretty_thread_id() << std::dec << " ";
}
Prints
0ms thread:0x5f (reply not ready)
999ms thread:0xf3 (reply not ready)
1999ms thread:0x5f (reply not ready)
2999ms thread:0x5f (reply not ready)
3499ms thread:0xf3 setting value
3999ms thread:0x5f Reply:1337
4999ms thread:0xf3 Reply:1337
5999ms thread:0xf3 Reply:1337
6999ms thread:0xf3 Reply:1337
7999ms thread:0xf3 Reply:1337
8999ms thread:0xf3 Reply:1337
Boost asio strand and io_service running on several threads
In short, a strand
guarantees sequential invocation of its own handlers, but makes no guarantee of concurrent execution of handlers from different strands. Thus, the answers to the points are:
- Yes. Sequential invocation is guaranteed.
- Yes. Concurrent execution could happen, but there is no guarantee.
- Yes to sequential invocation, but no to the the guarantee that concurrent execution will occur.
A strand
maintains its own handler queue, and guarantees that only one of its handlers is in the io_service
, resulting in handlers being synchronized before being placed into the io_service
. Thus, all handlers posted or dispatched through a strand
will be executed sequentially.
Concurrent execution of handlers posted or dispatched through different strand
s can occur, it is just not guaranteed to occur. The documentation states:
The implementation makes no guarantee that handlers posted or dispatched through different strand objects will be invoked concurrently.
Therefore, if Thread1
is executing a handler posted through Strand_1
, Boost.Asio will not use that information to guarantee that a handler posted through Strand_2
will be executed by Thread2
; however, it is possible that Thread2
is selected to execute the handler from Strand_2
based on other implementation details, such as being the next available thread in the list of threads running the io_service
.
For example, consider the case where 3 handlers A
, B
, and C
are ready to run within the io_service
:
A
was posted posted throughStrand_1
.B
was not posted through astrand
.C
was posted throughStrand_2
.
If Thread1
and Thread2
are running the io_service
, then one possible execution order is:
Thread1 | Thread2
----------------+----------------
start A() | start B()
`-- finish A() | |
start C() | `-- finish B()
`-- finish C() |
The illustrated execution order shows that that handlers (A
and C
) posted through different strand
s (Strand_1
and Strand_2
respectively) are not guaranteed to be executed concurrently.
Boost::ASIO - how to dedicate 2 threads to handle receiving and sending messages
You should use a single io_service
, however many threads you use to invoke io_service::run()
can also invoke handlers for asynchronous operations owned by the io_service
. If these handlers access shared data structures, you will need to use a strand
to ensure exclusive access. You'll also need to ensure at most one write operation
This operation is implemented in terms of zero or more calls to the
stream'sasync_write_some
function, and is known as a composed
operation. The program must ensure that the stream performs no other
write operations (such asasync_write
, the stream'sasync_write_some
function, or any other composed operations that perform writes) until
this operation completes.
and read operation
This operation is implemented in terms of zero or more calls to the
stream'sasync_read_some
function, and is known as a composed
operation. The program must ensure that the stream performs no other
read operations (such asasync_read
, the stream'sasync_read_some
function, or any other composed operations that perform reads) until
this operation completes.
is outstanding for each socket.
Using an io_service
for all async_write()
operations and another io_service
for all async_read()
operations is not possible because a single socket is serviced by one io_service
that is passed in as a parameter in the constructor.
In my experience most mult-io_service designs are driven by performance and latency requirements. The HTTP Server 2 example explores this with an io_service
per CPU.
Sending http request from multiple threads using boost::asio. How to handle responses serially
You are not actually reading asynchronously, therefore you're not actually synchronizing much with the strand. The only thing that is being synchronized is the access to the stream_
/socket.
Now, doing everything synchronously is good thinking. In that case I'd suggest you don't need any threads, and therefore, no strand to begin with.
Once you do have the strand/thread(s) doing non-trivial operations risks blocking the service thread(s). Consider when the webserver takes a second to respond. That's taking ages in computer terms.
If you are doing as many requests simultaneously as there are threads (which could typically be low, e.g. 4) then nothing else can progress on the io service, thus negating the very purpose of ASIO: asynchronous I/O.
Let me quickly pave over some minor issues in your question code, making it self-contained: Live On Coliru
#include <boost/beast/http.hpp>
#include <boost/beast.hpp>
#include <boost/asio.hpp>
#include <iostream>
using boost::asio::ip::tcp;
namespace beast = boost::beast;
namespace http = beast::http;
using Context = boost::asio::io_context;
using Strand = boost::asio::strand<Context::executor_type>;
struct Demo {
using Request = http::request<http::string_body>;
Demo(Context& ctx, tcp::endpoint ep) //
: strand_(ctx.get_executor())
{
stream_.connect(ep);
}
void Send(Request const& req)
{
post(strand_, [=,this]() {
// prepare request ...
http::write(stream_, req);
//...
http::response<boost::beast::http::dynamic_body> res;
beast::flat_buffer buffer;
beast::error_code ec;
http::read(stream_, buffer, res, ec);
std::cout << res << "\n";
});
}
private:
Strand strand_;
tcp::socket stream_{strand_};
};
int main() {
Context io;
Demo x(io, {{}, 80});
Demo::Request req{http::verb::get, "/", 10};
req.prepare_payload();
x.Send(req);
io.run();
}
Improving
I'd suggest an asynchronous interface that is safe to use. I.e. you cannot be sure a new request won't be started on the same socket before the previous one(s) have been completed, so you need a queue:
void Send(Request req) {
post(strand_, [this, req = std::move(req)]() mutable {
_outgoing.push_back(std::move(req));
if (_outgoing.size() == 1) // no pending
ServiceRequestQueue();
});
}
Now, all the logic you had is moved into the request loop, but async:
void ServiceRequestQueue()
{
http::async_write( //
stream_, _outgoing.front(), [this](beast::error_code ec, size_t) {
if (ec) {
std::cerr << "Request cannot be sent: " << ec.message() << std::endl;
return;
}
// receive response
_incoming.clear();
_incoming.body().clear();
http::async_read( //
stream_, buffer, _incoming,
[this](beast::error_code ec, size_t) {
if (ec) {
std::cerr << "Response cannot be received: "
<< ec.message() << std::endl;
return;
}
// std::cout << _incoming.base() << "\n";
std::cout << stream_.remote_endpoint() << " "
<< _incoming.result() << " "
<< _incoming.body().size() << "\n";
// request done
_outgoing.pop_front();
// continue if more queued
if (not _outgoing.empty())
ServiceRequestQueue();
});
});
}
You might want to split some of the completion handlers into separate functions, or do something useful with the request.
Live On Coliru
int main() {
Context io;
Demo example_com { io, "93.184.216.34", 80 } ;
Demo coliru { io, "173.203.57.63", 80 } ;
Demo localhost { io, "127.0.0.1", 80 } ;
// queue many requests before service start
auto queue10 = [](Demo& client, std::string hostname, int version) {
Demo::Request req{http::verb::get, "/", 11};
req.set(http::field::host,hostname);
req.prepare_payload();
for (int i = 0; i < 10; ++i)
client.Send(req);
};
queue10(example_com, "www.example.com", 11);
queue10(coliru, "coliru-stacked-crooked.com", 11);
queue10(localhost, "sehe.nl", 10);
// start service
io.run();
}
Prints, on my system:
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
93.184.216.34:80 OK 1256
127.0.0.1:80 OK 2798
127.0.0.1:80 OK 2798
93.184.216.34:80 OK 1256
173.203.57.63:80 OK 8616
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
173.203.57.63:80 OK 8616
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
173.203.57.63:80 OK 8616
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
93.184.216.34:80 OK 1256
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
173.203.57.63:80 OK 8616
Note that if you create mamy requests simultaneously (e.g. even before running the
io_context
at all), you can observe that separate HTTP clients work in overlapping fashion.
Advanced
If you really wanted a function that initiates a request and allows you to consume the response in the completion handler, consider extending your interface like this:
template <typename Token>
void async_send(Request req, Token&& token) {
using result_type = typename boost::asio::async_result<
std::decay_t<Token>, void(beast::error_code, Response)>;
using handler_type = typename result_type::completion_handler_type;
handler_type handler(std::forward<Token>(token));
result_type result(handler);
struct Op {
Request req;
Response res;
handler_type handler;
Op(Request&& r, handler_type&& h)
: req(std::move(r))
, handler(std::move(h))
{
}
bool check(beast::error_code ec, bool force_completion = false) {
if (ec || force_completion)
std::move(handler)(ec, std::move(res));
return !ec.failed();
}
};
auto op = std::make_shared<Op>(std::move(req), std::move(handler));
post(strand_, [this, op] {
http::async_write( //
stream_, op->req,
[this, op](beast::error_code ec, size_t) mutable {
if (op->check(ec))
http::async_read(stream_, buffer, op->res,
[op](beast::error_code ec, size_t) {
op->check(ec, true);
});
});
});
return result.get();
}
Note this moves the responsibility to avoid overlapping requests per client back to the caller. So starting some request chains like
// queue several request chains before service start
AsyncRequestChain(10, example_com, "www.example.com");
AsyncRequestChain(10, coliru, "coliru.stacked-crooked.com");
AsyncRequestChain(10, localhost, "sehe.nl");
// start service
io.run();
With the chain itself being:
void AsyncRequestChain(unsigned n, Demo& client, std::string hostname)
{
if (!n)
return;
Demo::Request req{http::verb::get, "/", 11};
req.set(http::field::host, hostname);
req.prepare_payload();
client.async_send( //
req, [=, &client](beast::error_code ec, Demo::Response&& res) {
std::cout << hostname << ": " << ec.message();
if (!ec)
std::cout << " " << res.result() //
<< " " << res.body().size();
std::cout << std::endl;
// continue with next iteration
AsyncRequestChain(n - 1, client, hostname);
});
}
Prints, on my machine:
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
sehe.nl: Success OK 2798
www.example.com: Success OK 1256
www.example.com: Success OK 1256
coliru.stacked-crooked.com: Success OK 8616
www.example.com: Success OK 1256
www.example.com: Success OK 1256
www.example.com: Success OK 1256
coliru.stacked-crooked.com: Success OK 8616
www.example.com: Success OK 1256
www.example.com: Success OK 1256
coliru.stacked-crooked.com: Success OK 8616
www.example.com: Success OK 1256
www.example.com: Success OK 1256
www.example.com: Success OK 1256
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
coliru.stacked-crooked.com: Success OK 8616
See it Live On Coliru
Related Topics
Implementation of Operators for Enum Class
Generate Random Numbers in C++ at Compile Time
Does an Unused Member Variable Take Up Memory
Interfaces VS Templates for Dependency Injection in C++
Opencv - Dll Missing, But It's Not
Is Rebasing Dlls (Or Providing an Appropriate Default Load Address) Worth the Trouble
C++ Bool Returns 0 1 Instead of True False
Resolving a Circular Dependency Between Template Classes
Setting Thread Priority in Linux with Boost
Matching an Overloaded Function to Its Polymorphic Argument
C++ Const Keyword - Use Liberally
Why Can't You Do Bitwise Operations on Pointer in C, and Is There a Way Around This