Async Wait on File Descriptor Using Boost Asio

Async wait on file descriptor using Boost Asio

This is precisely the problem null_buffers was designed for.

Sometimes a program must be integrated
with a third-party library that wants
to perform the I/O operations itself.
To facilitate this, Boost.Asio
includes a null_buffers type that can
be used with both read and write
operations. A null_buffers operation
doesn't return until the I/O object is
"ready" to perform the operation.

As an example, to perform a
non-blocking read something like the
following may be used:

ip::tcp::socket socket(my_io_service);
...
ip::tcp::socket::non_blocking nb(true);
socket.io_control(nb);
...
socket.async_read_some(null_buffers(), read_handler);
...
void read_handler(boost::system::error_code ec)
{
if (!ec)
{
std::vector<char> buf(socket.available());
socket.read_some(buffer(buf));
}
}

There's also an excellent example included in the documentation.

Schedule an asynchronous event that will complete when stdin has waiting data in boost::asio?

I think you should be able to use the posix stream descriptor to watch for input on file descriptor 0:

ba::posix::stream_descriptor d(io, 0);
input_loop = [&](error_code ec) {
if (!ec) {
program.on_input();
d.async_wait(ba::posix::descriptor::wait_type::wait_read, input_loop);
}
};

There, program::on_input() would call getch() with no timeout() until it returns ERR:

struct Program {
Program() {
initscr();
ESCDELAY = 0;
timeout(0);
cbreak();

noecho();
keypad(stdscr, TRUE); // receive special keys

clock = newwin(2, 40, 0, 0);
monitor = newwin(10, 40, 2, 0);

syncok(clock, true); // automatic updating
syncok(monitor, true);

scrollok(monitor, true); // scroll the input monitor window
}
~Program() {
delwin(monitor);
delwin(clock);
endwin();
}

void on_clock() {
wclear(clock);

char buf[32];
time_t t = time(NULL);
if (auto tmp = localtime(&t)) {
if (strftime(buf, sizeof(buf), "%T", tmp) == 0) {
strncpy(buf, "[error formatting time]", sizeof(buf));
}
} else {
strncpy(buf, "[error getting time]", sizeof(buf));
}

wprintw(clock, "Async: %s", buf);
wrefresh(clock);
}

void on_input() {
for (auto ch = getch(); ch != ERR; ch = getch()) {
wprintw(monitor, "received key %d ('%c')\n", ch, ch);
}
wrefresh(monitor);
}

WINDOW *monitor = nullptr;
WINDOW *clock = nullptr;
};

With the following main program you'd run it for 10 seconds (because Program doesn't yet know how to exit):

int main() {
Program program;

namespace ba = boost::asio;
using boost::system::error_code;
using namespace std::literals;

ba::io_service io;
std::function<void(error_code)> input_loop, clock_loop;

// Reading input when ready on stdin
ba::posix::stream_descriptor d(io, 0);
input_loop = [&](error_code ec) {
if (!ec) {
program.on_input();
d.async_wait(ba::posix::descriptor::wait_type::wait_read, input_loop);
}
};

// For fun, let's also update the time
ba::high_resolution_timer tim(io);
clock_loop = [&](error_code ec) {
if (!ec) {
program.on_clock();
tim.expires_from_now(100ms);
tim.async_wait(clock_loop);
}
};

input_loop(error_code{});
clock_loop(error_code{});
io.run_for(10s);
}

This works:

Sample Image

Full Listing

#include <boost/asio.hpp>
#include <boost/asio/posix/descriptor.hpp>
#include <iostream>
#include "ncurses.h"

#define CTRL_R 18
#define CTRL_C 3
#define TAB 9
#define NEWLINE 10
#define RETURN 13
#define ESCAPE 27
#define BACKSPACE 127
#define UP 72
#define LEFT 75
#define RIGHT 77
#define DOWN 80

struct Program {
Program() {
initscr();
ESCDELAY = 0;
timeout(0);
cbreak();

noecho();
keypad(stdscr, TRUE); // receive special keys

clock = newwin(2, 40, 0, 0);
monitor = newwin(10, 40, 2, 0);

syncok(clock, true); // automatic updating
syncok(monitor, true);

scrollok(monitor, true); // scroll the input monitor window
}
~Program() {
delwin(monitor);
delwin(clock);
endwin();
}

void on_clock() {
wclear(clock);

char buf[32];
time_t t = time(NULL);
if (auto tmp = localtime(&t)) {
if (strftime(buf, sizeof(buf), "%T", tmp) == 0) {
strncpy(buf, "[error formatting time]", sizeof(buf));
}
} else {
strncpy(buf, "[error getting time]", sizeof(buf));
}

wprintw(clock, "Async: %s", buf);
wrefresh(clock);
}

void on_input() {
for (auto ch = getch(); ch != ERR; ch = getch()) {
wprintw(monitor, "received key %d ('%c')\n", ch, ch);
}
wrefresh(monitor);
}

WINDOW *monitor = nullptr;
WINDOW *clock = nullptr;
};

int main() {
Program program;

namespace ba = boost::asio;
using boost::system::error_code;
using namespace std::literals;

ba::io_service io;
std::function<void(error_code)> input_loop, clock_loop;

// Reading input when ready on stdin
ba::posix::stream_descriptor d(io, 0);
input_loop = [&](error_code ec) {
if (!ec) {
program.on_input();
d.async_wait(ba::posix::descriptor::wait_type::wait_read, input_loop);
}
};

// For fun, let's also update the time
ba::high_resolution_timer tim(io);
clock_loop = [&](error_code ec) {
if (!ec) {
program.on_clock();
tim.expires_from_now(100ms);
tim.async_wait(clock_loop);
}
};

input_loop(error_code{});
clock_loop(error_code{});
io.run_for(10s);
}

Integrate boost::asio into file descriptor based eventloops (select/poll)

Based on the example in this answer I came up with this solution that uses a generic handler, which writes into a wake-up pipe and then posts the handler call into another io_service. The read end of the pipe can be used in a file descriptor based event loop and the callback run_handler() is called from there, which clears the pipe and runs pending handlers in the main thread.

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/optional.hpp>
#include <boost/thread.hpp>

/// @brief Type used to emulate asynchronous host resolution with a
/// dedicated thread pool.
class resolver {
public:
resolver(const std::size_t pool_size)
: work_(boost::ref(resolver_service_)) {
// Create wake-up pipe
pipe(pipe_);
fcntl(pipe_[0], F_SETFL, O_NONBLOCK);
// Create pool.
for (std::size_t i = 0; i < pool_size; ++i)
threads_.create_thread(boost::bind(&boost::asio::io_service::run,
&resolver_service_));
}

~resolver() {
work_ = boost::none;
threads_.join_all();
}

template <typename QueryOrEndpoint, typename Handler>
void async_resolve(QueryOrEndpoint query, Handler handler) {
resolver_service_.post(boost::bind(
&resolver::do_async_resolve<QueryOrEndpoint, Handler>, this,
query, handler));
}

// callback for eventloop in main thread
void run_handler() {
char c;
// clear wake-up pipe
while (read(pipe_[0], &c, 1) > 0);
// run handler posted from resolver threads
handler_service_.poll();
handler_service_.reset();
}

// get read end of wake up pipe for polling in eventloop
int fd() {
return pipe_[0];
}

private:
/// @brief Resolve address and invoke continuation handler.
template <typename QueryOrEndpoint, typename Handler>
void do_async_resolve(const QueryOrEndpoint& query, Handler handler) {
typedef typename QueryOrEndpoint::protocol_type protocol_type;
typedef typename protocol_type::resolver resolver_type;

// Resolve synchronously, as synchronous resolution will perform work
// in the calling thread. Thus, it will not use Boost.Asio's internal
// thread that is used for asynchronous resolution.
boost::system::error_code error;
resolver_type resolver(resolver_service_);
typename resolver_type::iterator result = resolver.resolve(query, error);

// post handler callback to service running in main thread
handler_service_.post(boost::bind(handler, error, result));
// wake up eventloop in main thread
write(pipe_[1], "*", 1);
}

private:
boost::asio::io_service resolver_service_;
boost::asio::io_service handler_service_;
boost::optional<boost::asio::io_service::work> work_;
boost::thread_group threads_;
int pipe_[2];
};

template <typename ProtocolType>
void handle_resolve(
const boost::system::error_code& error,
typename ProtocolType::resolver::iterator iterator) {
std::stringstream stream;
stream << "handle_resolve:\n"
" " << error.message() << "\n";
if (!error)
stream << " " << iterator->endpoint() << "\n";

std::cout << stream.str();
std::cout.flush();
}

int main() {
// Resolver will emulate asynchronous host resolution with a pool of 5
// threads.
resolver resolver(5);

namespace ip = boost::asio::ip;
resolver.async_resolve(
ip::udp::resolver::query("localhost", "12345"),
&handle_resolve<ip::udp>);
resolver.async_resolve(
ip::tcp::resolver::query("www.google.com", "80"),
&handle_resolve<ip::tcp>);
resolver.async_resolve(
ip::udp::resolver::query("www.stackoverflow.com", "80"),
&handle_resolve<ip::udp>);
resolver.async_resolve(
ip::icmp::resolver::query("some.other.address", "54321"),
&handle_resolve<ip::icmp>);

pollfd fds;
fds.fd = resolver.fd();
fds.events = POLLIN;

// simple eventloop
while (true) {
if (poll(&fds, 1, 2000)) // waiting for wakeup call
resolver.run_handler(); // call resolve handler
else
break;
}
}

Asynchronous tcp server using boost's async_write results in bad file descriptor

I slightly modified the code to be compatible with Boost 1.74.0.

Then I ran it with ASAN:

=================================================================
==22695==ERROR: AddressSanitizer: heap-use-after-free on address 0x61b000000080 at pc 0x5571c3b61379 bp 0x7ffddce81980 sp 0x7ffddce81970
READ of size 8 at 0x61b000000080 thread T0
#0 0x5571c3b61378 in boost::asio::detail::strand_executor_service::strand_impl::~strand_i...
#1 0x5571c3c02599 in std::_Sp_counted_ptr<boost::asio::detail::strand_executor_service::s...
#2 0x5571c3b874e9 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr...
#3 0x5571c3b6d429 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /...
#4 0x5571c3b5ff84 in std::__shared_ptr<boost::asio::detail::strand_executor_service::stra...
#5 0x5571c3b5ffeb in std::shared_ptr<boost::asio::detail::strand_executor_service::strand...
#6 0x5571c3b7d8d0 in boost::asio::strand<boost::asio::execution::any_executor<boost::asio...
#7 0x5571c3c0da6c in void std::destroy_at<boost::asio::strand<boost::asio::execution::any...
#8 0x5571c3c0b761 in void std::allocator_traits<std::allocator<boost::asio::strand<boost:...
#9 0x5571c3c01847 in std::_Sp_counted_ptr_inplace<boost::asio::strand<boost::asio::execut...
#10 0x5571c3b874e9 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /us...
#11 0x5571c3b6d429 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() ...
#12 0x5571c3b25690 in std::__shared_ptr<void, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr(...
#13 0x5571c3b256f7 in std::shared_ptr<void>::~shared_ptr() /usr/include/c++/10/bits/share...
#14 0x5571c3b25769 in boost::asio::execution::detail::any_executor_base::destroy_shared(b...
#15 0x5571c3b24ef2 in boost::asio::execution::detail::any_executor_base::~any_executor_ba...
#16 0x5571c3b6a0f7 in boost::asio::execution::any_executor<boost::asio::execution::contex...
#17 0x5571c3bbf447 in my_project::TcpConnectionHandler::~TcpConnectionHandler() /home/seh...
#18 0x5571c3bbf4e1 in void boost::checked_delete<my_project::TcpConnectionHandler>(my_pro...
#19 0x5571c3c020a9 in boost::detail::sp_counted_impl_p<my_project::TcpConnectionHandler>:...
#20 0x5571c3b5bf6b in boost::detail::sp_counted_base::release() /home/sehe/custom/boost_1...
#21 0x5571c3b5c537 in boost::detail::shared_count::~shared_count() /home/sehe/custom/boos...
#22 0x5571c3b6a324 in boost::shared_ptr<my_project::TcpConnectionHandler>::~shared_ptr() ...
#23 0x5571c3b1d48e in my_project::TcpServer::~TcpServer() /home/sehe/Projects/stackoverfl...
#24 0x5571c3b1e837 in main /home/sehe/Projects/stackoverflow/test.cpp:266
#25 0x7ff1f049fbf6 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x21bf6)
#26 0x5571c3b18b99 in _start (/home/sehe/Projects/stackoverflow/sotest+0x182b99)

0x61b000000080 is located 0 bytes inside of 1640-byte region [0x61b000000080,0x61b0000006e8)
freed by thread T0 here:
#0 0x7ff1f1eb4407 in operator delete(void*, unsigned long) (/usr/lib/x86_64-linux-gnu/lib...
#1 0x5571c3c003f4 in boost::asio::detail::strand_executor_service::~strand_executor_servi...
#2 0x5571c3b35e4a in boost::asio::detail::service_registry::destroy(boost::asio::executio...
#3 0x5571c3b3578b in boost::asio::detail::service_registry::destroy_services() /home/sehe...
#4 0x5571c3b37a4f in boost::asio::execution_context::destroy() /home/sehe/custom/boost_1_...
#5 0x5571c3b3788a in boost::asio::execution_context::~execution_context() /home/sehe/cust...
#6 0x5571c3b54621 in boost::asio::io_context::~io_context() /home/sehe/custom/boost_1_74_...
#7 0x5571c3b1d43b in my_project::TcpServer::~TcpServer() /home/sehe/Projects/stackoverflo...
#8 0x5571c3b1e837 in main /home/sehe/Projects/stackoverflow/test.cpp:266
#9 0x7ff1f049fbf6 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x21bf6)

previously allocated by thread T0 here:
#0 0x7ff1f1eb33a7 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.6+...
#1 0x5571c3bc0faa in boost::asio::execution_context::service* boost::asio::detail::servic...
#2 0x5571c3b3623a in boost::asio::detail::service_registry::do_use_service(boost::asio::e...
#3 0x5571c3bb74f9 in boost::asio::detail::strand_executor_service& boost::asio::detail::s...
#4 0x5571c3bab8e7 in boost::asio::detail::strand_executor_service& boost::asio::use_servi...
#5 0x5571c3ba085f in std::shared_ptr<boost::asio::detail::strand_executor_service::strand...
#6 0x5571c3b92744 in boost::asio::strand<boost::asio::execution::any_executor<boost::asio...
#7 0x5571c3b7d844 in boost::asio::strand<boost::asio::execution::any_executor<boost::asio...
#8 0x5571c3b18e8a in my_project::TcpConnectionHandler::TcpConnectionHandler(std::__cxx11:...
#9 0x5571c3b1dbe4 in my_project::TcpServer::start_accept() /home/sehe/Projects/stackoverf...
#10 0x5571c3b1c775 in my_project::TcpServer::TcpServer(std::__cxx11::basic_string<char, s...
#11 0x5571c3b1e7b9 in main /home/sehe/Projects/stackoverflow/test.cpp:266
#12 0x7ff1f049fbf6 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x21bf6)

SUMMARY: AddressSanitizer: heap-use-after-free /home/sehe/custom/boost_1_74_0/boost/asio/detail/impl/strand_executor_service.ipp:88 in boost::asio::detail::strand_executor_service::strand_impl::~strand_impl()
Shadow bytes around the buggy address:
0x0c367fff7fc0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
0x0c367fff7fd0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
0x0c367fff7fe0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
0x0c367fff7ff0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
0x0c367fff8000: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa
=>0x0c367fff8010:[fd]fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
0x0c367fff8020: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
0x0c367fff8030: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
0x0c367fff8040: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
0x0c367fff8050: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
0x0c367fff8060: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
Shadow byte legend (one shadow byte represents 8 application bytes):
Addressable: 00
Partially addressable: 01 02 03 04 05 06 07
Heap left redzone: fa
Freed heap region: fd
Stack left redzone: f1
Stack mid redzone: f2
Stack right redzone: f3
Stack after return: f5
Stack use after scope: f8
Global redzone: f9
Global init order: f6
Poisoned by user: f7
Container overflow: fc
Array cookie: ac
Intra object redzone: bb
ASan internal: fe
Left alloca redzone: ca
Right alloca redzone: cb
Shadow gap: cc
==22695==ABORTING

As you can see, ~TcpServer destructor causes the connections to use the io_service after it was destroyed. Reordering members will fix this:

boost::shared_ptr<TcpConnectionHandler> connection_;
boost::asio::io_service io_service_;
boost::asio::ip::tcp::acceptor acceptor_;

Should be

boost::asio::io_service io_service_;
boost::shared_ptr<TcpConnectionHandler> connection_;
boost::asio::ip::tcp::acceptor acceptor_;

More

To actually listen, it seems that you would want to /wait/ for the service to exit, not stop() and interrupt() the thread?

Note also, there is no need to dynamically allocate the thread. Why should C++ programmers minimize use of 'new'?

TcpServer::~TcpServer() {
if (io_thread_.joinable()) {
//io_service_.stop();
//io_thread_.interrupt();
io_thread_.join();
}
}

Lifetime issue: shared_from_this?

boost::bind(&TcpConnectionHandler::handle_write, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)

This piece fails to capture the shared pointer, which leads to TcpConnectionHandler being destructed. (When the socket is destructed, it is closed). Actually, your problem is UB because it's use-after-free but you're "lucky" to not see a crash, which leads you to see the UB in the form of an invalid handle.

Fix them both times:

post(executor_,
boost::bind(&TcpConnectionHandler::writeImpl, shared_from_this(), message));

And

async_read_until(socket_, message_, "\r\n",
boost::bind(&TcpConnectionHandler::handle_read,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));

Live Demo

I tested it successfully with the following:

#define BOOST_BIND_NO_PLACEHOLDERS
#ifndef TCP_SERVER_
#define TCP_SERVER_

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <iostream>

namespace my_project {
class TcpConnectionHandler
: public boost::enable_shared_from_this<TcpConnectionHandler> {
public:
TcpConnectionHandler(
std::string log_prefix, boost::asio::any_io_executor executor,
boost::function<void(std::string&)> received_message_callback);

boost::asio::ip::tcp::socket& socket();

void start();
void write(const std::string& message);

private:
void writeImpl(const std::string& message);
void write();
void handle_read(const boost::system::error_code& error,
size_t bytes_transferred);
void handle_write(const boost::system::error_code& error,
size_t bytes_transferred);

boost::asio::any_io_executor executor_;
boost::asio::ip::tcp::socket socket_;
boost::asio::streambuf message_;
std::string log_prefix_;
boost::function<void(std::string&)> received_message_callback_;

std::deque<std::string> outbox_;
};

class TcpServer {
public:
TcpServer(std::string log_prefix, unsigned int port,
boost::function<void(std::string&)> received_message_callback);
~TcpServer();

void start();

void write(std::string content);

private:
void start_accept();

void handle_accept(boost::shared_ptr<TcpConnectionHandler> connection,
const boost::system::error_code& error);

boost::asio::io_service io_service_;
boost::shared_ptr<TcpConnectionHandler> connection_;
boost::asio::ip::tcp::acceptor acceptor_;
std::string log_prefix_;
boost::function<void(std::string&)> received_message_callback_;
boost::condition_variable connection_cond_;
boost::mutex connection_mutex_;
bool client_connected_;
boost::thread io_thread_; /**< Thread to run boost.asio io_service. */
};

} // namespace my_project

#endif // #ifndef TCP_SERVER_

//#include "tcp_server.h"
//#include "easylogging++.h"
//#include "utils.h"

namespace my_project {
// TcpConnectionHandler
TcpConnectionHandler::TcpConnectionHandler(
std::string log_prefix, boost::asio::any_io_executor executor,
boost::function<void(std::string&)> received_message_callback)
: executor_(make_strand(executor)),
socket_(executor_),
log_prefix_(log_prefix),
received_message_callback_(received_message_callback)
{ }

boost::asio::ip::tcp::socket& TcpConnectionHandler::socket() { return socket_; }

void TcpConnectionHandler::start() {
async_read_until(socket_, message_, "\r\n",
boost::bind(&TcpConnectionHandler::handle_read,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}

void TcpConnectionHandler::write(const std::string& message) {
post(executor_,
boost::bind(&TcpConnectionHandler::writeImpl, shared_from_this(), message));
}

void TcpConnectionHandler::writeImpl(const std::string& message) {
outbox_.push_back(message);
if (outbox_.size() > 1) {
// outstanding async_write
return;
}

write();
}

void TcpConnectionHandler::write() {
const std::string& message = outbox_[0];
boost::asio::async_write(
socket_, boost::asio::buffer(message.c_str(), message.size()),
boost::asio::bind_executor(
executor_,
boost::bind(&TcpConnectionHandler::handle_write, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
}

void TcpConnectionHandler::handle_read(const boost::system::error_code& error,
size_t /*bytes_transferred*/) {

std::cerr << __FUNCTION__ << ": " << error.message() << "\n";
// Check for client disconnection
if ((boost::asio::error::eof == error) ||
(boost::asio::error::connection_reset == error)) {
// LOG(ERROR) << log_prefix_ << " TCP/IP client disconnected!";
return;
}

// Convert stream to string
std::istream stream(&message_);
std::istreambuf_iterator<char> eos;
std::string message_str(std::istreambuf_iterator<char>(stream), eos);

// LOG(DEBUG) << log_prefix_ << " communication object received message: "
// << getPrintableMessage(message_str);

std::istringstream iss(message_str);

std::string msg;
std::getline(iss, msg, '\r'); // Consumes from the streambuf.

// Discard the rest of the message from buffer
message_.consume(message_.size());

if (!error) {
received_message_callback_(msg);
start();
} else {
// TODO: Handler here the error
}
}

void TcpConnectionHandler::handle_write(const boost::system::error_code& error,
size_t /*bytes_transferred*/) {
outbox_.pop_front();

if (error) {
std::cerr << "could not write: "
<< boost::system::system_error(error).what() << std::endl;
return;
}

if (!outbox_.empty()) {
// more messages to send
write();
}
}

// TcpServer

TcpServer::TcpServer(
std::string log_prefix, unsigned int port,
boost::function<void(std::string&)> received_message_callback)
: acceptor_(io_service_, boost::asio::ip::tcp::endpoint(
boost::asio::ip::tcp::v4(), port)),
client_connected_(false) {

log_prefix_ = log_prefix;
received_message_callback_ = received_message_callback;

start_accept();

// Run io_service in secondary thread
io_thread_ = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));
}

TcpServer::~TcpServer() {
if (io_thread_.joinable()) {
//io_service_.stop();
//io_thread_.interrupt();
io_thread_.join();
}
}

void TcpServer::start() {
// Wait until client is connected to our TCP server. (condition variable)
boost::unique_lock<boost::mutex> lock(connection_mutex_);

while (!client_connected_) {
// LOG(INFO) << "Waiting for " << log_prefix_ << " client to establish
// connection...";

connection_cond_.wait(lock);
}

// LOG(INFO) << log_prefix_ << " client successfully connected.";
}

void TcpServer::write(std::string content) { connection_->write(content); }

void TcpServer::start_accept() {
// Create a new connection handler
connection_.reset(new TcpConnectionHandler(
log_prefix_, acceptor_.get_executor(), received_message_callback_));

// Asynchronous accept operation and wait for a new connection.
acceptor_.async_accept(connection_->socket(),
boost::bind(&TcpServer::handle_accept, this,
connection_,
boost::asio::placeholders::error));

// LOG(DEBUG) << log_prefix_ << " communication object started asynchronous
// TCP/IP connection acceptance.";
}

void TcpServer::handle_accept(
boost::shared_ptr<TcpConnectionHandler> connection,
const boost::system::error_code& error) {
std::cerr << __FUNCTION__ << ": " << error.message() << "\n";
if (!error) {
// LOG(INFO) << log_prefix_ << " client connected!";
connection->start();
boost::mutex::scoped_lock lock(connection_mutex_);
client_connected_ = true;
connection_cond_.notify_one();
// LOG(INFO) << log_prefix_ << " client connection accepted";
}

start_accept();
}
} // namespace my_project

int main() {
my_project::TcpServer s("demo", 6868, [](std::string& s) {
std::cout << "Received msg: " << std::quoted(s) << "\n";
});
}

And as a client, e.g.

cat test.cpp | netcat -Cw 0 localhost 6868

Prints the whole lot like

Received msg: "//#define BOOST_BIND_GLOBAL_PLACEHOLDERS"
Received msg: "#include <boost/thread.hpp>"
Received msg: "std::string& message);"
Received msg: "td::string> outbox_;"
Received msg: ":shared_ptr<TcpConnectionHandler> connection_;"
Received msg: "#include \"utils.h\""
Received msg: "ConnectionHandler::start() {"
Received msg: "::writeImpl(const std::string& message) {"
Received msg: "s(),"
Received msg: " // LOG(ERROR) << log_prefix_ << \" TCP/IP client disconnected!\";"
Received msg: " // Consumes from the streambuf."
Received msg: "e: \""
Received msg: "nt_connected_(false) {"
Received msg: "d to our TCP server. (condition variable)"
Received msg: "ection handler"
Received msg: "nication object started asynchronous"
Received msg: "ond_.notify_one();"

As you can see you have to fix that you need to take bytes_received into account, but I'll leave that to you for now.

Post Scriptum

Oh. I noticed another thing. If you intend to only accept 1 connection, don't reset the connection_, because that means that write acts on an unconnected instance. Perhaps you wanted to keep a list of connected clients?

Here's a simple rework to replace the single connection_ member with a list<weak_ptr<TcpConnectionHandler> > connections_;. It even has a basic garbage collection built in.



Related Topics



Leave a reply



Submit