Boost Asio Async_Write: How to Not Interleaving Async_Write Calls

boost asio async_write : how to not interleaving async_write calls?

Is there a simple way to avoid this problem ?

Yes, maintain an outgoing queue for each client. Inspect the queue size in the async_write completion handler, if non-zero, start another async_write operation. Here is a sample

#include <boost/asio.hpp>
#include <boost/bind.hpp>

#include <deque>
#include <iostream>
#include <string>

class Connection
{
public:
Connection(
boost::asio::io_service& io_service
) :
_io_service( io_service ),
_strand( _io_service ),
_socket( _io_service ),
_outbox()
{

}

void write(
const std::string& message
)
{
_strand.post(
boost::bind(
&Connection::writeImpl,
this,
message
)
);
}

private:
void writeImpl(
const std::string& message
)
{
_outbox.push_back( message );
if ( _outbox.size() > 1 ) {
// outstanding async_write
return;
}

this->write();
}

void write()
{
const std::string& message = _outbox[0];
boost::asio::async_write(
_socket,
boost::asio::buffer( message.c_str(), message.size() ),
_strand.wrap(
boost::bind(
&Connection::writeHandler,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
)
);
}

void writeHandler(
const boost::system::error_code& error,
const size_t bytesTransferred
)
{
_outbox.pop_front();

if ( error ) {
std::cerr << "could not write: " << boost::system::system_error(error).what() << std::endl;
return;
}

if ( !_outbox.empty() ) {
// more messages to send
this->write();
}
}


private:
typedef std::deque<std::string> Outbox;

private:
boost::asio::io_service& _io_service;
boost::asio::io_service::strand _strand;
boost::asio::ip::tcp::socket _socket;
Outbox _outbox;
};

int
main()
{
boost::asio::io_service io_service;
Connection foo( io_service );
}

some key points

  • the boost::asio::io_service::strand protects access to Connection::_outbox
  • a handler is dispatched from Connection::write() since it is public

it wasn't obvious to me if you were using similar practices in the example in your question since all methods are public.

boost::asio::async_write - ensure only one outstanding call

Here is a complete, compilable, and tested, example, that I researched and got to work through trial and error after reading the answer and subsequent edits from RustyX.

Connection.h

#pragma once

#include <boost/asio.hpp>

#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>

//--------------------------------------------------------------------
class ConnectionManager;

//--------------------------------------------------------------------
class Connection : public std::enable_shared_from_this<Connection>
{
public:

typedef std::shared_ptr<Connection> SharedPtr;

// Ensure all instances are created as shared_ptr in order to fulfill requirements for shared_from_this
static Connection::SharedPtr Create(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket & socket);

//
static std::string ErrorCodeToString(const boost::system::error_code & errorCode);

Connection(const Connection &) = delete;
Connection(Connection &&) = delete;
Connection & operator = (const Connection &) = delete;
Connection & operator = (Connection &&) = delete;
~Connection();

// We have to defer the start until we are fully constructed because we share_from_this()
void Start();
void Stop();

void Send(const std::vector<char> & data);

private:

static size_t m_nextClientId;

size_t m_clientId;
ConnectionManager * m_owner;
boost::asio::ip::tcp::socket m_socket;
std::atomic<bool> m_stopped;
boost::asio::streambuf m_receiveBuffer;
mutable std::mutex m_sendMutex;
std::vector<char> m_sendBuffers[2]; // Double buffer
int m_activeSendBufferIndex;
bool m_sending;

std::vector<char> m_allReadData; // Strictly for test purposes

Connection(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket socket);

void DoReceive();
void DoSend();
};

//--------------------------------------------------------------------

Connection.cpp

#include "Connection.h"
#include "ConnectionManager.h"

#include <boost/bind.hpp>

#include <algorithm>
#include <cstdlib>

//--------------------------------------------------------------------
size_t Connection::m_nextClientId(0);

//--------------------------------------------------------------------
Connection::SharedPtr Connection::Create(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket & socket)
{
return Connection::SharedPtr(new Connection(connectionManager, std::move(socket)));
}

//--------------------------------------------------------------------------------------------------
std::string Connection::ErrorCodeToString(const boost::system::error_code & errorCode)
{
std::ostringstream debugMsg;
debugMsg << " Error Category: " << errorCode.category().name() << ". "
<< " Error Message: " << errorCode.message() << ". ";

// IMPORTANT - These comparisons only work if you dynamically link boost libraries
// Because boost chose to implement boost::system::error_category::operator == by comparing addresses
// The addresses are different in one library and the other when statically linking.
//
// We use make_error_code macro to make the correct category as well as error code value.
// Error code value is not unique and can be duplicated in more than one category.
if (errorCode == boost::asio::error::make_error_code(boost::asio::error::connection_refused))
{
debugMsg << " (Connection Refused)";
}
else if (errorCode == boost::asio::error::make_error_code(boost::asio::error::eof))
{
debugMsg << " (Remote host has disconnected)";
}
else
{
debugMsg << " (boost::system::error_code has not been mapped to a meaningful message)";
}

return debugMsg.str();
}

//--------------------------------------------------------------------
Connection::Connection(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket socket)
:
m_clientId (m_nextClientId++)
, m_owner (connectionManager)
, m_socket (std::move(socket))
, m_stopped (false)
, m_receiveBuffer ()
, m_sendMutex ()
, m_sendBuffers ()
, m_activeSendBufferIndex (0)
, m_sending (false)
, m_allReadData ()
{
printf("Client connection with id %zd has been created.", m_clientId);
}

//--------------------------------------------------------------------
Connection::~Connection()
{
// Boost uses RAII, so we don't have anything to do. Let thier destructors take care of business
printf("Client connection with id %zd has been destroyed.", m_clientId);
}

//--------------------------------------------------------------------
void Connection::Start()
{
DoReceive();
}

//--------------------------------------------------------------------
void Connection::Stop()
{
// The entire connection class is only kept alive, because it is a shared pointer and always has a ref count
// as a consequence of the outstanding async receive call that gets posted every time we receive.
// Once we stop posting another receive in the receive handler and once our owner release any references to
// us, we will get destroyed.
m_stopped = true;
m_owner->OnConnectionClosed(shared_from_this());
}

//--------------------------------------------------------------------
void Connection::Send(const std::vector<char> & data)
{
std::lock_guard<std::mutex> lock(m_sendMutex);

// Append to the inactive buffer
std::vector<char> & inactiveBuffer = m_sendBuffers[m_activeSendBufferIndex ^ 1];
inactiveBuffer.insert(inactiveBuffer.end(), data.begin(), data.end());

//
DoSend();
}

//--------------------------------------------------------------------
void Connection::DoSend()
{
// Check if there is an async send in progress
// An empty active buffer indicates there is no outstanding send
if (m_sendBuffers[m_activeSendBufferIndex].empty())
{
m_activeSendBufferIndex ^= 1;

std::vector<char> & activeBuffer = m_sendBuffers[m_activeSendBufferIndex];
auto self(shared_from_this());

boost::asio::async_write(m_socket, boost::asio::buffer(activeBuffer),
[self](const boost::system::error_code & errorCode, size_t bytesTransferred)
{
std::lock_guard<std::mutex> lock(self->m_sendMutex);

self->m_sendBuffers[self->m_activeSendBufferIndex].clear();

if (errorCode)
{
printf("An error occured while attemping to send data to client id %zd. %s", self->m_clientId, ErrorCodeToString(errorCode).c_str());

// An error occurred
// We do not stop or close on sends, but instead let the receive error out and then close
return;
}

// Check if there is more to send that has been queued up on the inactive buffer,
// while we were sending what was on the active buffer
if (!self->m_sendBuffers[self->m_activeSendBufferIndex ^ 1].empty())
{
self->DoSend();
}
});
}
}

//--------------------------------------------------------------------
void Connection::DoReceive()
{
auto self(shared_from_this());

boost::asio::async_read_until(m_socket, m_receiveBuffer, '#',
[self](const boost::system::error_code & errorCode, size_t bytesRead)
{
if (errorCode)
{
// Check if the other side hung up
if (errorCode == boost::asio::error::make_error_code(boost::asio::error::eof))
{
// This is not really an error. The client is free to hang up whenever they like
printf("Client %zd has disconnected.", self->m_clientId);
}
else
{
printf("An error occured while attemping to receive data from client id %zd. Error Code: %s", self->m_clientId, ErrorCodeToString(errorCode).c_str());
}

// Notify our masters that we are ready to be destroyed
self->m_owner->OnConnectionClosed(self);

// An error occured
return;
}

// Grab the read data
std::istream stream(&self->m_receiveBuffer);
std::string data;
std::getline(stream, data, '#');
data += "#";

printf("Received data from client %zd: %s", self->m_clientId, data.c_str());

// Issue the next receive
if (!self->m_stopped)
{
self->DoReceive();
}
});
}

//--------------------------------------------------------------------

ConnectionManager.h

#pragma once

#include "Connection.h"

// Boost Includes
#include <boost/asio.hpp>

// Standard Includes
#include <thread>
#include <vector>

//--------------------------------------------------------------------
class ConnectionManager
{
public:

ConnectionManager(unsigned port, size_t numThreads);
ConnectionManager(const ConnectionManager &) = delete;
ConnectionManager(ConnectionManager &&) = delete;
ConnectionManager & operator = (const ConnectionManager &) = delete;
ConnectionManager & operator = (ConnectionManager &&) = delete;
~ConnectionManager();

void Start();
void Stop();

void OnConnectionClosed(Connection::SharedPtr connection);

protected:

boost::asio::io_service m_io_service;
boost::asio::ip::tcp::acceptor m_acceptor;
boost::asio::ip::tcp::socket m_listenSocket;
std::vector<std::thread> m_threads;

mutable std::mutex m_connectionsMutex;
std::vector<Connection::SharedPtr> m_connections;

boost::asio::deadline_timer m_timer;

void IoServiceThreadProc();

void DoAccept();
void DoTimer();
};

//--------------------------------------------------------------------

ConnectionManager.cpp

#include "ConnectionManager.h"

#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>

#include <system_error>
#include <cstdio>

//------------------------------------------------------------------------------
ConnectionManager::ConnectionManager(unsigned port, size_t numThreads)
:
m_io_service ()
, m_acceptor (m_io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
, m_listenSocket(m_io_service)
, m_threads (numThreads)
, m_timer (m_io_service)
{
}

//------------------------------------------------------------------------------
ConnectionManager::~ConnectionManager()
{
Stop();
}

//------------------------------------------------------------------------------
void ConnectionManager::Start()
{
if (m_io_service.stopped())
{
m_io_service.reset();
}

DoAccept();

for (auto & thread : m_threads)
{
if (!thread.joinable())
{
thread.swap(std::thread(&ConnectionManager::IoServiceThreadProc, this));
}
}

DoTimer();
}

//------------------------------------------------------------------------------
void ConnectionManager::Stop()
{
{
std::lock_guard<std::mutex> lock(m_connectionsMutex);
m_connections.clear();
}

// TODO - Will the stopping of the io_service be enough to kill all the connections and ultimately have them get destroyed?
// Because remember they have outstanding ref count to thier shared_ptr in the async handlers
m_io_service.stop();

for (auto & thread : m_threads)
{
if (thread.joinable())
{
thread.join();
}
}
}

//------------------------------------------------------------------------------
void ConnectionManager::IoServiceThreadProc()
{
try
{
// Log that we are starting the io_service thread
{
printf("io_service socket thread starting.");
}

// Run the asynchronous callbacks from the socket on this thread
// Until the io_service is stopped from another thread
m_io_service.run();
}
catch (std::system_error & e)
{
printf("System error caught in io_service socket thread. Error Code: %d", e.code().value());
}
catch (std::exception & e)
{
printf("Standard exception caught in io_service socket thread. Exception: %s", e.what());
}
catch (...)
{
printf("Unhandled exception caught in io_service socket thread.");
}

{
printf("io_service socket thread exiting.");
}
}

//------------------------------------------------------------------------------
void ConnectionManager::DoAccept()
{
m_acceptor.async_accept(m_listenSocket,
[this](const boost::system::error_code errorCode)
{
if (errorCode)
{
printf("An error occured while attemping to accept connections. Error Code: %s", Connection::ErrorCodeToString(errorCode).c_str());
return;
}

// Create the connection from the connected socket
std::lock_guard<std::mutex> lock(m_connectionsMutex);
Connection::SharedPtr connection = Connection::Create(this, m_listenSocket);
m_connections.push_back(connection);
connection->Start();

DoAccept();
});
}

//------------------------------------------------------------------------------
void ConnectionManager::OnConnectionClosed(Connection::SharedPtr connection)
{
std::lock_guard<std::mutex> lock(m_connectionsMutex);

auto itConnection = std::find(m_connections.begin(), m_connections.end(), connection);
if (itConnection != m_connections.end())
{
m_connections.erase(itConnection);
}
}

//------------------------------------------------------------------------------
void ConnectionManager::DoTimer()
{
if (!m_io_service.stopped())
{
// Send messages every second
m_timer.expires_from_now(boost::posix_time::seconds(30));
m_timer.async_wait(
[this](const boost::system::error_code & errorCode)
{
std::lock_guard<std::mutex> lock(m_connectionsMutex);
for (auto connection : m_connections)
{
connection->Send(std::vector<char>{'b', 'e', 'e', 'p', '#'});
}

DoTimer();
});
}
}

main.cpp

#include "ConnectionManager.h"

#include <cstring>
#include <iostream>
#include <string>

int main()
{
// Start up the server
ConnectionManager connectionManager(5000, 2);
connectionManager.Start();

// Pretend we are doing other things or just waiting for shutdown
std::this_thread::sleep_for(std::chrono::minutes(5));

// Stop the server
connectionManager.Stop();

return 0;
}

What to do when asio::async_write doesn't call the handler?

Your code is locking a mutex where async_write() is called and unlocking it in the handler. If these operations happen in different threads then this will violate the unlock() preconditions (the current thread owns the mutex). This may be causing your deadlock. In general, separated locking and unlocking is usually an indication of suboptimal design.

A better way of handling asynchronous writes would be to use a queue. The logic would look something like:

Write function:

  • Lock mutex.
  • Push buffer onto queue.
  • If queue contains exactly one element, call async_write on the front element.
  • Unlock mutex (ideally via a scoped_lock falling out of scope).

Write handler:

  • Lock mutex.
  • Pop buffer from queue (and free it).
  • If the queue is not empty, call async_write on the front element.
  • Unlock mutex (ideally via a scoped_lock falling out of scope).

With this approach locking is localized and there is no opportunity for deadlock. @TannerSansbury's pointer in the comments to using a strand for synchronization instead of a mutex will work similarly - it implicitly provides exclusion for any function run in the strand.

I've never had a case of a write handler not being called that wasn't eventually determined to be my own bug, but you can use a boost::asio::deadline_timer to watch for this by setting the timer when you call async_write() and canceling it in the write handler.

Any issues calling Boost C++ ASIO function close() concurrently with an async_write() on the same socket?

If you use a strand¹ there's no real problem, though I'll submit that it's probably cleaner (much cleaner) to invoke shutdown() before/instead of close() in many scenarios.

Asio does requires you to synchronize access to the socket/stream objects (everything other than the documented thread-safe objects as io_context and strand, really). Strands fulfill the role traditionally held by critical sections.

¹ implicit or explicit, see Why do I need strand per connection when using boost::asio?

Boost asio async_write callback doesn't get called

If you really want to poll the io_service manually, do this after it gets some work, and call reset between the iterations.

Besides, do not call asio::async_write in a loop - the data won't arrive in the correct order. Instead, either prepare a single sequence of buffers and send it at once, or chain async_write - completion handler - async_write, as shown in the examples.



Related Topics



Leave a reply



Submit