asio :how does server actively send information to client while listening to client at the same time?
I hope I got your question correctly and you also want to understand how asio works with async operations.
In brief, everything happens in run
function of io_service
object.
When you call async_write
or async_read
in some function, you register a callback (that will be called when some event happens) and then immediatly return to the function. Then when all callbacks are registered it returns in run
and waits for events.
So I will try to explain your code in detail. Method run
is a kind of infinite loop, where it checks if an event has happened. First you register a callback with acceptor_.async_accept
, return to run
and wait. When a client connects, the callback for accept event is called and you register a callback with boost::asio::async_read
in session->start
(I guessed this one, since you did not provide do_read_header
source), register the callback in do_accept
again, return to run
and wait for new clients or data. When data is recived, you call write
and register another callback with boost::asio::async_write
and so on... And in the end still return to run
.
Since run
is a blocking function, if you need to process input from another source (server console for example) you can do one of the following:
1) Use run_one
insted of run
. Write your own loop where run_one
is called and other actions are processed.
2) Call run
in another thread (also all operations associated with io_service
will be processed in this thread), while in main thread process other actions. And a small example for this:
// here we process all network (or other) operations associated with io_service
void run( boost::asio::io_service& io_service ) {
while( true ) {
try {
io_service.run();
break; // run() exited normally
}
catch( std::exception& e) {
// Deal with exception as appropriate.
}
}
}
int main( ) {
...
boost::asio::io_service io_service;
...
// start io_service.run( ) in separate thread
auto t = std::thread( &run, std::ref( io_service ) );
...
while( true ) {
std::string line;
// read line from console
std::getline( std::cin, line );
// process input
if( line == "stop" ) {
io_service.stop( );
break;
} else {
send_to_everyone( line );
}
}
}
io_service
is thread safe, so you can safely use async operations like boost::asio::async_write
in main thread (in send_to_everyone
for example) to register other callbacks and write/read data from clients.
Boost Asio - Client to Server Strange output
Well, I did it.
When I read something, the data length was in the variable size_t length.
So, from it, I did :
if (!error) // If there is no error
{
for (int i = 0; i < (length); i++) // If i < length
{
std::cout << m_data[i]; // Print every char since i < length
if (i == length) // Just to make the output more presentable
std::cout << "\n"; // Enter
}
start(); // Back to the begining.
}
Input : "Hi"
Ouput : "Hi" \n
Thanks for your help, Drax and sehe. See you.
The problem I have, at the moment is that I can't stock m_data in a std::string because this is in a for. I tried with vector, but no way. I want to get it because it seemes pretty easier to work with string than with char.
So what I want to do is : Get the for's output m_data in a string.
Simple server using Boost.Asio throws an exception
You're using enable_shared_from_this
and but nothing keeps your Session alive because you only use unique_ptr<Session>
.
This means your Session goes away during running operations.
Fix it:
std::make_shared<Session>(std::move(socket))->start();
Next, hold a shared pointer in your completion handler:
void Session::start()
{
auto self = shared_from_this();
socket_.async_read_some(buffer(data_), [this, self](error_code errorCode, size_t /*length*/) {
if (!errorCode) {
std::cout << "received: " << data_ << std::endl;
}
start();
});
}
Next, CUT the async loop if there is an error (or your session will loop infinitely):
socket_.async_read_some(buffer(data_), [this, self](error_code errorCode, size_t length) {
if (!errorCode && length) {
std::cout << "received: " << data_ << std::endl;
start();
}
});
Finally, resize the buffer so you can actually receive data (!):
data_.resize(32);
socket_.async_read_some(buffer(data_), [this, self](error_code errorCode, size_t length) {
if (!errorCode) {
data_.resize(length);
std::cout << "received: '" << data_ << "'" << std::endl;
start();
}
});
There are still some issues left, but hey, the program won't crash immediately and you have some results.
Update
Added a live demo showing some more suggestions Live On Coliru
#include <iostream>
#include <string>
#include <memory>
#include <boost/asio.hpp>
using namespace boost::asio;
using namespace boost::system;
using boost::asio::ip::tcp;
class Session : public std::enable_shared_from_this<Session> {
public:
Session(tcp::socket socket);
void start();
private:
tcp::socket socket_;
boost::asio::streambuf _sb;
};
Session::Session(tcp::socket socket) : socket_(std::move(socket))
{}
void Session::start()
{
auto self = shared_from_this();
async_read_until(socket_, _sb, '\n', [this, self](error_code errorCode, size_t /*length*/) {
std::cout << "completion " << errorCode.message() << "\n";
if (!errorCode) {
std::string line;
{
std::istream is(&_sb);
if (getline(is, line)) {
std::cout << "received: '" << line << "'" << std::endl;
}
start();
}
}
});
}
class Server {
public:
Server(io_context& context);
private:
tcp::acceptor acceptor_;
void accept();
};
Server::Server(io_context& context) : acceptor_(context, tcp::endpoint(tcp::v4(), 8888))
{
accept();
}
void Server::accept()
{
acceptor_.async_accept([this](error_code errorCode, tcp::socket socket) {
if (!errorCode) {
std::make_shared<Session>(std::move(socket))->start();
}
accept();
});
}
int main(int argc, char**) {
if (argc>1) {
io_context context;
tcp::socket socket(context);
tcp::resolver resolver(context);
connect(socket, resolver.resolve("127.0.0.1", "8888"));
std::string data;
while (getline(std::cin, data)) {
try {
data += '\n';
write(socket, buffer(data));
} catch (const std::exception& exception) {
std::cerr << exception.what() << std::endl;
}
}
} else {
boost::asio::io_context context;
Server server(context);
context.run();
}
}
boost::asio async server design
There are far too many unknowns to identify the root cause of the delay from the posted code. Nevertheless, there are a few approaches and considerations that can be taken to help to identify the problem:
- Enable handler tracking for Boost.Asio 1.47+. Simply define
BOOST_ASIO_ENABLE_HANDLER_TRACKING
and Boost.Asio will write debug output, including timestamps, to the standard error stream. These timestamps can be used to help filter out delays introduced by application code (parseHeader()
,parsePacket()
, etc.). - Verify that byte-ordering is being handled properly. For example, if the protocol defines the header's
size
field as two bytes in network-byte-order and the server is handling the field as a raw short, then upon receiving a message that has a body size of10
:- A big-endian machine will call
async_read
reading10
bytes. The read operation should complete quickly as the socket already has the10
byte body available for reading. - A little-endian machine will call
async_read
reading2560
bytes. The read operation will likely remain outstanding, as far more bytes are trying to be read than is intended.
- A big-endian machine will call
- Use tracing tools such as strace, ltrace, etc.
- Modify Boost.Asio, adding timestamps throughout the callstack. Boost.Asio is shipped as a header-file only library. Thus, users may modify it to provide as much verbosity as desired. While not the cleanest or easiest of approaches, adding a print statement with timestamps throughout the callstack may help provide visibility into timing.
- Try duplicating the behavior in a short, simple, self contained example. Start with the simplest of examples to determine if the delay is systamtic. Then, iteratively expand upon the example so that it becomes closer to the real-code with each iteration.
Here is a simple example from which I started:
#include <iostream>
#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
class tcp_server
: public boost::enable_shared_from_this< tcp_server >
{
private:
enum
{
header_size = 4,
data_size = 10,
buffer_size = 1024,
max_stamp = 50
};
typedef boost::asio::ip::tcp tcp;
public:
typedef boost::array< boost::posix_time::ptime, max_stamp > time_stamps;
public:
tcp_server( boost::asio::io_service& service,
unsigned short port )
: strand_( service ),
acceptor_( service, tcp::endpoint( tcp::v4(), port ) ),
socket_( service ),
index_( 0 )
{}
/// @brief Returns collection of timestamps.
time_stamps& stamps()
{
return stamps_;
}
/// @brief Start the server.
void start()
{
acceptor_.async_accept(
socket_,
boost::bind( &tcp_server::handle_accept, this,
boost::asio::placeholders::error ) );
}
private:
/// @brief Accept connection.
void handle_accept( const boost::system::error_code& error )
{
if ( error )
{
std::cout << error.message() << std::endl;
return;
}
read_header();
}
/// @brief Read header.
void read_header()
{
boost::asio::async_read(
socket_,
boost::asio::buffer( buffer_, header_size ),
boost::bind( &tcp_server::handle_read_header, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred ) );
}
/// @brief Handle reading header.
void
handle_read_header( const boost::system::error_code& error,
std::size_t bytes_transferred )
{
if ( error )
{
std::cout << error.message() << std::endl;
return;
}
// If no more stamps can be recorded, then stop the async-chain so
// that io_service::run can return.
if ( !record_stamp() ) return;
// Read data.
boost::asio::async_read(
socket_,
boost::asio::buffer( buffer_, data_size ),
boost::bind( &tcp_server::handle_read_data, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred ) );
}
/// @brief Handle reading data.
void handle_read_data( const boost::system::error_code& error,
std::size_t bytes_transferred )
{
if ( error )
{
std::cout << error.message() << std::endl;
return;
}
// If no more stamps can be recorded, then stop the async-chain so
// that io_service::run can return.
if ( !record_stamp() ) return;
// Start reading header again.
read_header();
}
/// @brief Record time stamp.
bool record_stamp()
{
stamps_[ index_++ ] = boost::posix_time::microsec_clock::local_time();
return index_ < max_stamp;
}
private:
boost::asio::io_service::strand strand_;
tcp::acceptor acceptor_;
tcp::socket socket_;
boost::array< char, buffer_size > buffer_;
time_stamps stamps_;
unsigned int index_;
};
int main()
{
boost::asio::io_service service;
// Create and start the server.
boost::shared_ptr< tcp_server > server =
boost::make_shared< tcp_server >( boost::ref(service ), 33333 );
server->start();
// Run. This will exit once enough time stamps have been sampled.
service.run();
// Iterate through the stamps.
tcp_server::time_stamps& stamps = server->stamps();
typedef tcp_server::time_stamps::iterator stamp_iterator;
using boost::posix_time::time_duration;
for ( stamp_iterator iterator = stamps.begin() + 1,
end = stamps.end();
iterator != end;
++iterator )
{
// Obtain the delta between the current stamp and the previous.
time_duration delta = *iterator - *(iterator - 1);
std::cout << "Delta: " << delta.total_milliseconds() << " ms"
<< std::endl;
}
// Calculate the total delta.
time_duration delta = *stamps.rbegin() - *stamps.begin();
std::cout << "Total"
<< "\n Start: " << *stamps.begin()
<< "\n End: " << *stamps.rbegin()
<< "\n Delta: " << delta.total_milliseconds() << " ms"
<< std::endl;
}
A few notes about the implementation:
- There is only one thread (main) and one asynchronous chain read_header->handle_read_header->handle_read_data. This should minimize the amount of time a ready-to-run handler spends waiting for an available thread.
- To focus on
boost::asio::async_read
, noise is minimized by:- Using a pre-allocated buffer.
- Not using
shared_from_this()
orstrand::wrap
. - Recording the timestamps, and perform processing post-collection.
I compiled on CentOS 5.4 using gcc 4.4.0 and Boost 1.50. To drive the data, I opted to send 1000 bytes using netcat:
$ ./a.out > output &
[1] 18623
$ echo "$(for i in {0..1000}; do echo -n "0"; done)" | nc 127.0.0.1 33333
[1]+ Done ./a.out >output
$ tail output
Delta: 0 ms
Delta: 0 ms
Delta: 0 ms
Delta: 0 ms
Delta: 0 ms
Delta: 0 ms
Total
Start: 2012-Sep-10 21:22:45.585780
End: 2012-Sep-10 21:22:45.586716
Delta: 0 ms
Observing no delay, I expanded upon the example by modifying the boost::asio::async_read
calls, replacing this
with shared_from_this()
and wrapping the ReadHandlers
s with strand_.wrap()
. I ran the updated example and still observed no delay. Unfortunately, that is as far as I could get based on the code posted in the question.
Consider expanding upon the example, adding in a piece from the real implementation with each iteration. For example:
- Start with using the
msg
variable's type to control the buffer. - Next, send valid data, and introduce
parseHeader()
andparsePacket
functions. - Finally, introduce the
lib::GET_SERVER_TIME()
print.
If the example code is as close as possible to the real code, and no delay is being observed with boost::asio::async_read
, then the ReadHandler
s may be ready-to-run in the real code, but they are waiting on synchronization (the strand) or a resource (a thread), resulting in a delay:
- If the delay is the result of synchronization with the strand, then consider Robin's suggestion by reading a larger block of data to potentially reduce the amount of reads required per-message.
- If the delay is the result of waiting for a thread, then consider having an additional thread call
io_service::run()
.
Related Topics
How to Run the Preprocessor on Local Headers Only
Glut Deprecation in MAC Osx 10.9, Ide: Qt Creator
Multiple Sfinae Class Template Specialisations Using Void_T
What Are the Operations Supported by Raw Pointer and Function Pointer in C/C++
Cmake: Project Structure with Unit Tests
How to Handle a Transitive Dependency Conflict Using Git Submodules and Cmake
Why Doesn't the C++11 'Auto' Keyword Work for Static Members
Getting the Size of a C++ Function
Why Do Lambda Functions Drop Deduced Return Type Reference by Default
Move Out Element of Std Priority_Queue in C++11
Why Must Virtual Base Classes Be Constructed by the Most Derived Class
Qt MACro Keywords Cause Name Collisions
What am I Allowed to Do with a Static, Constexpr, In-Class Initialized Data Member