Boost Asio - How to Write Console Server

asio :how does server actively send information to client while listening to client at the same time?

I hope I got your question correctly and you also want to understand how asio works with async operations.

In brief, everything happens in run function of io_service object.
When you call async_write or async_read in some function, you register a callback (that will be called when some event happens) and then immediatly return to the function. Then when all callbacks are registered it returns in run and waits for events.

So I will try to explain your code in detail. Method run is a kind of infinite loop, where it checks if an event has happened. First you register a callback with acceptor_.async_accept, return to run and wait. When a client connects, the callback for accept event is called and you register a callback with boost::asio::async_read in session->start (I guessed this one, since you did not provide do_read_header source), register the callback in do_accept again, return to run and wait for new clients or data. When data is recived, you call write and register another callback with boost::asio::async_write and so on... And in the end still return to run.

Since run is a blocking function, if you need to process input from another source (server console for example) you can do one of the following:

1) Use run_one insted of run. Write your own loop where run_one is called and other actions are processed.

2) Call run in another thread (also all operations associated with io_service will be processed in this thread), while in main thread process other actions. And a small example for this:

// here we process all network (or other) operations associated with io_service
void run( boost::asio::io_service& io_service ) {
while( true ) {
try {
io_service.run();
break; // run() exited normally
}
catch( std::exception& e) {
// Deal with exception as appropriate.
}
}
}

int main( ) {
...
boost::asio::io_service io_service;
...
// start io_service.run( ) in separate thread
auto t = std::thread( &run, std::ref( io_service ) );
...
while( true ) {
std::string line;
// read line from console
std::getline( std::cin, line );
// process input
if( line == "stop" ) {
io_service.stop( );
break;
} else {
send_to_everyone( line );
}
}
}

io_service is thread safe, so you can safely use async operations like boost::asio::async_write in main thread (in send_to_everyone for example) to register other callbacks and write/read data from clients.

Boost Asio - Client to Server Strange output

Well, I did it.
When I read something, the data length was in the variable size_t length.
So, from it, I did :

if (!error) // If there is no error
{
for (int i = 0; i < (length); i++) // If i < length
{
std::cout << m_data[i]; // Print every char since i < length
if (i == length) // Just to make the output more presentable
std::cout << "\n"; // Enter
}
start(); // Back to the begining.
}

Input : "Hi"
Ouput : "Hi" \n

Thanks for your help, Drax and sehe. See you.

The problem I have, at the moment is that I can't stock m_data in a std::string because this is in a for. I tried with vector, but no way. I want to get it because it seemes pretty easier to work with string than with char.
So what I want to do is : Get the for's output m_data in a string.

Simple server using Boost.Asio throws an exception

You're using enable_shared_from_this and but nothing keeps your Session alive because you only use unique_ptr<Session>.

This means your Session goes away during running operations.

Fix it:

std::make_shared<Session>(std::move(socket))->start();

Next, hold a shared pointer in your completion handler:

void Session::start()
{
auto self = shared_from_this();
socket_.async_read_some(buffer(data_), [this, self](error_code errorCode, size_t /*length*/) {
if (!errorCode) {
std::cout << "received: " << data_ << std::endl;
}
start();
});
}

Next, CUT the async loop if there is an error (or your session will loop infinitely):

socket_.async_read_some(buffer(data_), [this, self](error_code errorCode, size_t length) {
if (!errorCode && length) {
std::cout << "received: " << data_ << std::endl;
start();
}
});

Finally, resize the buffer so you can actually receive data (!):

data_.resize(32);
socket_.async_read_some(buffer(data_), [this, self](error_code errorCode, size_t length) {
if (!errorCode) {
data_.resize(length);
std::cout << "received: '" << data_ << "'" << std::endl;
start();
}
});

There are still some issues left, but hey, the program won't crash immediately and you have some results.

Update

Added a live demo showing some more suggestions Live On Coliru

#include <iostream>
#include <string>
#include <memory>

#include <boost/asio.hpp>

using namespace boost::asio;
using namespace boost::system;
using boost::asio::ip::tcp;

class Session : public std::enable_shared_from_this<Session> {
public:
Session(tcp::socket socket);

void start();
private:
tcp::socket socket_;
boost::asio::streambuf _sb;
};

Session::Session(tcp::socket socket) : socket_(std::move(socket))
{}

void Session::start()
{
auto self = shared_from_this();
async_read_until(socket_, _sb, '\n', [this, self](error_code errorCode, size_t /*length*/) {
std::cout << "completion " << errorCode.message() << "\n";
if (!errorCode) {
std::string line;
{
std::istream is(&_sb);
if (getline(is, line)) {
std::cout << "received: '" << line << "'" << std::endl;
}
start();
}
}
});
}

class Server {
public:
Server(io_context& context);
private:
tcp::acceptor acceptor_;

void accept();
};

Server::Server(io_context& context) : acceptor_(context, tcp::endpoint(tcp::v4(), 8888))
{
accept();
}

void Server::accept()
{
acceptor_.async_accept([this](error_code errorCode, tcp::socket socket) {
if (!errorCode) {
std::make_shared<Session>(std::move(socket))->start();
}
accept();
});
}

int main(int argc, char**) {
if (argc>1) {
io_context context;
tcp::socket socket(context);
tcp::resolver resolver(context);
connect(socket, resolver.resolve("127.0.0.1", "8888"));
std::string data;
while (getline(std::cin, data)) {
try {
data += '\n';
write(socket, buffer(data));
} catch (const std::exception& exception) {
std::cerr << exception.what() << std::endl;
}
}
} else {
boost::asio::io_context context;
Server server(context);
context.run();
}
}

boost::asio async server design

There are far too many unknowns to identify the root cause of the delay from the posted code. Nevertheless, there are a few approaches and considerations that can be taken to help to identify the problem:

  • Enable handler tracking for Boost.Asio 1.47+. Simply define BOOST_ASIO_ENABLE_HANDLER_TRACKING and Boost.Asio will write debug output, including timestamps, to the standard error stream. These timestamps can be used to help filter out delays introduced by application code (parseHeader(), parsePacket(), etc.).
  • Verify that byte-ordering is being handled properly. For example, if the protocol defines the header's size field as two bytes in network-byte-order and the server is handling the field as a raw short, then upon receiving a message that has a body size of 10:

    • A big-endian machine will call async_read reading 10 bytes. The read operation should complete quickly as the socket already has the 10 byte body available for reading.
    • A little-endian machine will call async_read reading 2560 bytes. The read operation will likely remain outstanding, as far more bytes are trying to be read than is intended.
  • Use tracing tools such as strace, ltrace, etc.
  • Modify Boost.Asio, adding timestamps throughout the callstack. Boost.Asio is shipped as a header-file only library. Thus, users may modify it to provide as much verbosity as desired. While not the cleanest or easiest of approaches, adding a print statement with timestamps throughout the callstack may help provide visibility into timing.
  • Try duplicating the behavior in a short, simple, self contained example. Start with the simplest of examples to determine if the delay is systamtic. Then, iteratively expand upon the example so that it becomes closer to the real-code with each iteration.

Here is a simple example from which I started:

#include <iostream>

#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>

class tcp_server
: public boost::enable_shared_from_this< tcp_server >
{
private:

enum
{
header_size = 4,
data_size = 10,
buffer_size = 1024,
max_stamp = 50
};

typedef boost::asio::ip::tcp tcp;

public:

typedef boost::array< boost::posix_time::ptime, max_stamp > time_stamps;

public:

tcp_server( boost::asio::io_service& service,
unsigned short port )
: strand_( service ),
acceptor_( service, tcp::endpoint( tcp::v4(), port ) ),
socket_( service ),
index_( 0 )
{}

/// @brief Returns collection of timestamps.
time_stamps& stamps()
{
return stamps_;
}

/// @brief Start the server.
void start()
{
acceptor_.async_accept(
socket_,
boost::bind( &tcp_server::handle_accept, this,
boost::asio::placeholders::error ) );
}

private:

/// @brief Accept connection.
void handle_accept( const boost::system::error_code& error )
{
if ( error )
{
std::cout << error.message() << std::endl;
return;
}

read_header();
}

/// @brief Read header.
void read_header()
{
boost::asio::async_read(
socket_,
boost::asio::buffer( buffer_, header_size ),
boost::bind( &tcp_server::handle_read_header, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred ) );
}

/// @brief Handle reading header.
void
handle_read_header( const boost::system::error_code& error,
std::size_t bytes_transferred )
{
if ( error )
{
std::cout << error.message() << std::endl;
return;
}

// If no more stamps can be recorded, then stop the async-chain so
// that io_service::run can return.
if ( !record_stamp() ) return;

// Read data.
boost::asio::async_read(
socket_,
boost::asio::buffer( buffer_, data_size ),
boost::bind( &tcp_server::handle_read_data, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred ) );

}

/// @brief Handle reading data.
void handle_read_data( const boost::system::error_code& error,
std::size_t bytes_transferred )
{
if ( error )
{
std::cout << error.message() << std::endl;
return;
}

// If no more stamps can be recorded, then stop the async-chain so
// that io_service::run can return.
if ( !record_stamp() ) return;

// Start reading header again.
read_header();
}

/// @brief Record time stamp.
bool record_stamp()
{
stamps_[ index_++ ] = boost::posix_time::microsec_clock::local_time();

return index_ < max_stamp;
}

private:
boost::asio::io_service::strand strand_;
tcp::acceptor acceptor_;
tcp::socket socket_;
boost::array< char, buffer_size > buffer_;
time_stamps stamps_;
unsigned int index_;
};

int main()
{
boost::asio::io_service service;

// Create and start the server.
boost::shared_ptr< tcp_server > server =
boost::make_shared< tcp_server >( boost::ref(service ), 33333 );
server->start();

// Run. This will exit once enough time stamps have been sampled.
service.run();

// Iterate through the stamps.
tcp_server::time_stamps& stamps = server->stamps();
typedef tcp_server::time_stamps::iterator stamp_iterator;
using boost::posix_time::time_duration;
for ( stamp_iterator iterator = stamps.begin() + 1,
end = stamps.end();
iterator != end;
++iterator )
{
// Obtain the delta between the current stamp and the previous.
time_duration delta = *iterator - *(iterator - 1);
std::cout << "Delta: " << delta.total_milliseconds() << " ms"
<< std::endl;
}
// Calculate the total delta.
time_duration delta = *stamps.rbegin() - *stamps.begin();
std::cout << "Total"
<< "\n Start: " << *stamps.begin()
<< "\n End: " << *stamps.rbegin()
<< "\n Delta: " << delta.total_milliseconds() << " ms"
<< std::endl;
}

A few notes about the implementation:

  • There is only one thread (main) and one asynchronous chain read_header->handle_read_header->handle_read_data. This should minimize the amount of time a ready-to-run handler spends waiting for an available thread.
  • To focus on boost::asio::async_read, noise is minimized by:

    • Using a pre-allocated buffer.
    • Not using shared_from_this() or strand::wrap.
    • Recording the timestamps, and perform processing post-collection.

I compiled on CentOS 5.4 using gcc 4.4.0 and Boost 1.50. To drive the data, I opted to send 1000 bytes using netcat:

$ ./a.out > output &
[1] 18623
$ echo "$(for i in {0..1000}; do echo -n "0"; done)" | nc 127.0.0.1 33333
[1]+ Done ./a.out >output
$ tail output
Delta: 0 ms
Delta: 0 ms
Delta: 0 ms
Delta: 0 ms
Delta: 0 ms
Delta: 0 ms
Total
Start: 2012-Sep-10 21:22:45.585780
End: 2012-Sep-10 21:22:45.586716
Delta: 0 ms

Observing no delay, I expanded upon the example by modifying the boost::asio::async_read calls, replacing this with shared_from_this() and wrapping the ReadHandlerss with strand_.wrap(). I ran the updated example and still observed no delay. Unfortunately, that is as far as I could get based on the code posted in the question.

Consider expanding upon the example, adding in a piece from the real implementation with each iteration. For example:

  • Start with using the msg variable's type to control the buffer.
  • Next, send valid data, and introduce parseHeader() and parsePacket functions.
  • Finally, introduce the lib::GET_SERVER_TIME() print.

If the example code is as close as possible to the real code, and no delay is being observed with boost::asio::async_read, then the ReadHandlers may be ready-to-run in the real code, but they are waiting on synchronization (the strand) or a resource (a thread), resulting in a delay:

  • If the delay is the result of synchronization with the strand, then consider Robin's suggestion by reading a larger block of data to potentially reduce the amount of reads required per-message.
  • If the delay is the result of waiting for a thread, then consider having an additional thread call io_service::run().


Related Topics



Leave a reply



Submit