How to Call Accept() for One Socket from Several Threads Simultaneously

Can I call accept() for one socket from several threads simultaneously?

Yes, you can call accept() on the same listening socket from multiple threads and multiple process though there might not be as much point to it as you think. The kernel will only allow one to succeed. When this is done with processes it is known as pre-forking and it saves the expense of a fork() for every new connection. But when you are dealing with threads you can more easily have an existing thread pool that waits on a queue of new connections. One thread does the accept and writes the queue and the worker threads read the queue and do their thing. It's cleaner, it's a well understood pattern, and you lose almost nothing.

Calling accept() from multiple threads

As mentioned in the StackOverflow answer you linked, a single thread calling accept() is probably the way to go. You mention concerns about locking, but these days you will find lockfree queue implementations available in Boost.Lockfree, Intel TBB, and elsewhere. You could use one of those if you like, but you might just use a condition variable to let your worker threads sleep and wake one of them when a new connection is established.

Using multiple threads with accept() on a nonblocking listener in each process

You need to use EPOLLET or EPOLLONESHOT so that exactly one thread gets woken by the EPOLLIN event when a new connection comes in. The handling thread then needs to call accept in a loop until it returns EAGAIN (EPOLLET) or manually reset with epoll_ctl (EPOLLONESHOT) in order for more connections to be handled.

In general when using multiple threads and epoll, you want to use EPOLLET or EPOLLONESHOT. Otherwise when an event happens, multiple threads will be woken to handle it and they may interfere with each other. At best, they'll just waste time figuring out that some other thread is handling the event before waiting again. At worst they'll deadlock or corrupt stuff.

multiple InputReader for a single Socket

It doesn't make sense to listen on a socket in 2 (or more) different threads simultaneously. What you need to do is have a single "entry point" for incoming data, and have that entry point understand the context of each data block (message) and spread that to what ever logical peace of code uses it. This is a general design pattern, not a java issue.

How to safely write to a socket from multiple threads?

You only have 1 thread running the IO service. Everything is on an implicit strand (Why do I need strand per connection when using boost::asio?), no need to worry UNTIL you start using a new thread.

The simplest fix, then, would seem to make sure sending the replies happens on the IO service as well:

void process_message(std::string const& message) {
std::string response = handler.processMessage(message);
post(socket_.get_executor(),
std::bind(&session::do_write, shared_from_this(), response));
}

Now if you wanted to be able to run the IO services on multiple threads, you would just make sure that the socket uses a strand executor.

HOWEVER

This doesn't guarantee that you won't see overlapping async_write operations, because the speed at which incoming messages are processed might be higher than the speed in which they are sent. Therefore the customary solution is

Queueing

In my examples I typically call this FIFO queue "outbox_" and I prefer to use deque for reasons of iterator/reference stability (see Iterator invalidation rules for C++ containers):

void do_write(std::string message)
{
outbox_.push_back(std::move(message)); // assumed on (implicit) strand
if (outbox_.size() == 1) {
write_loop();
}
}

void write_loop() {
if (outbox_.empty())
return;

asio::async_write( //
socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()] //
(error_code ec, std::size_t /*length*/) {
if (!ec) {
outbox_.pop_front();
write_loop();
} else if (ec != asio::error::eof) {
std::cerr << "Write error: " << ec.message() << '\n';
}
});
}

Demo

Here's a fixed listing with a stub message.h.

It also greatly simplifies the reading/buffer handling by using the existing async_read_until composed operation, which does everything you had manually written.

Live On Coliru

#include <boost/asio.hpp>
#include <cstdlib>
#include <deque>
#include <iostream>
#include <thread>
#include <utility>
#if 0
#include "messages.h"
#else // mock messages.h
#include <boost/lexical_cast.hpp>
#include <iomanip>
struct MessageHandler {
std::string initialMessage() const { return "Initial\n"; }
std::string processMessage(std::string const& req) const {
return "Processed " +
boost::lexical_cast<std::string>(std::quoted(req)) + "\n";
}
};
#endif

namespace asio = boost::asio;
using boost::system::error_code;
using asio::ip::tcp;

class session : public std::enable_shared_from_this<session> {
public:
session(tcp::socket socket) : socket_(std::move(socket)) {}

void start() {
handler = MessageHandler();
asio::write(socket_, asio::buffer(handler.initialMessage()));
do_read();
}

private:
void do_read() {
async_read_until(
socket_, asio::dynamic_buffer(buffer_), '\0',
[this, self = shared_from_this()] //
(error_code ec, std::size_t length) {
if (!ec) {
std::thread(&session::process_message, this, buffer_.substr(0, length - 1)).detach();
buffer_.erase(0, length);

do_read();
} else if (ec != asio::error::eof) {
std::cerr << "Read error: " << ec.message() << std::endl;
}
});
}

void do_write(std::string message)
{
outbox_.push_back(std::move(message)); // assumed on (implicit) strand
if (outbox_.size() == 1) {
write_loop();
}
}

void write_loop() {
if (outbox_.empty())
return;

asio::async_write( //
socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()] //
(error_code ec, std::size_t /*length*/) {
if (!ec) {
outbox_.pop_front();
write_loop();
} else if (ec != asio::error::eof) {
std::cerr << "Write error: " << ec.message() << std::endl;
}
});
}

void process_message(std::string const& message) {
std::string response = handler.processMessage(message);
// dispatch/post to executor because we are on a different thread
post(socket_.get_executor(),
std::bind(&session::do_write, shared_from_this(), response));
}

tcp::socket socket_;
std::string buffer_;
std::deque<std::string> outbox_;
MessageHandler handler;
};

class server
{
public:
server(asio::io_context& io_context, unsigned short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
socket_(io_context)
{
do_accept();
}

private:
void do_accept()
{
acceptor_.async_accept(socket_, [this](error_code ec) {
if (!ec) {
std::cout << "Accepted " << socket_.remote_endpoint() << std::endl;
std::make_shared<session>(std::move(socket_))->start();
}

do_accept();
});
}

tcp::acceptor acceptor_;
tcp::socket socket_;
};

void serverInit() {
try {
asio::io_context io_context;

server s(io_context, 8989);

io_context.run();
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
}

int main() { serverInit(); }

When sending a last burst of requests:

printf 'Message%d\0' {1..100} | nc 127.0.0.1 8989 -w1

Prints correctly e.g.:

Accepted 127.0.0.1:34862

And the client receivese e.g.:

Initial
Processed "Message2"
Processed "Message1"
Processed "Message4"
Processed "Message3"
Processed "Message5"
Processed "Message6"
Processed "Message7"
Processed "Message8"
Processed "Message9"
Processed "Message10"
Processed "Message11"
Processed "Message12"
Processed "Message13"
Processed "Message15"
Processed "Message16"
Processed "Message14"
Processed "Message18"
Processed "Message19"
Processed "Message20"
Processed "Message21"
Processed "Message22"
Processed "Message23"
Processed "Message24"
Processed "Message25"
Processed "Message26"
Processed "Message27"
Processed "Message28"
Processed "Message29"
Processed "Message30"
Processed "Message31"
Processed "Message32"
Processed "Message33"
Processed "Message34"
Processed "Message35"
Processed "Message17"
Processed "Message36"
Processed "Message38"
Processed "Message39"
Processed "Message40"
Processed "Message41"
Processed "Message42"
Processed "Message43"
Processed "Message44"
Processed "Message45"
Processed "Message46"
Processed "Message47"
Processed "Message48"
Processed "Message49"
Processed "Message50"
Processed "Message51"
Processed "Message52"
Processed "Message53"
Processed "Message54"
Processed "Message55"
Processed "Message56"
Processed "Message57"
Processed "Message58"
Processed "Message59"
Processed "Message60"
Processed "Message61"
Processed "Message62"
Processed "Message63"
Processed "Message64"
Processed "Message65"
Processed "Message66"
Processed "Message67"
Processed "Message68"
Processed "Message69"
Processed "Message70"
Processed "Message71"
Processed "Message72"
Processed "Message73"
Processed "Message74"
Processed "Message75"
Processed "Message76"
Processed "Message77"
Processed "Message78"
Processed "Message79"
Processed "Message80"
Processed "Message81"
Processed "Message82"
Processed "Message83"
Processed "Message84"
Processed "Message85"
Processed "Message86"
Processed "Message87"
Processed "Message88"
Processed "Message89"
Processed "Message90"
Processed "Message91"
Processed "Message92"
Processed "Message93"
Processed "Message94"
Processed "Message95"
Processed "Message96"
Processed "Message97"
Processed "Message98"
Processed "Message99"
Processed "Message100"
Processed "Message37"

BONUS: Adding the strand

Minimal changes:

class server
{
public:
server(asio::any_io_executor ex, unsigned short port)
: acceptor_(ex, tcp::endpoint(tcp::v4(), port)) {
do_accept();
}

private:
void do_accept()
{
acceptor_.async_accept(make_strand(acceptor_.get_executor()), [this](error_code ec, tcp::socket&& s) {
if (!ec) {
std::cout << "Accepted " << s.remote_endpoint() << std::endl;
std::make_shared<session>(std::move(s))->start();
}

do_accept();
});
}

tcp::acceptor acceptor_;
};

void serverInit() {
try {
asio::thread_pool io_context;

server s(io_context.get_executor(), 8989);

io_context.join();
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
}

Live demo:

Sample Image

post many wsasend on the same socket from multiple threads

It seems that actually calls to WSASend() are not thread safe and that if you intend to call WSASend() from multiple threads on the same connection then you should synchronise so that only one thread is actually calling into the API at any given point (i.e. hold a per connection lock around the WSASend() call). See this question and the attached test code for details: TCP/IP IOCP received data sometimes corrupt - Visual C++ on Windows and this blog entry for a more detailed explaination.

Also note that if you are sending distinct application level messages using multiple WSASend() calls from multiple threads then you have a problem as the multiple calls could be intermingled before the data gets written to the TCP stream.

So,

If you have 5 threads sending 'complete 2 byte messages' such that you send

AA, BB, CC, DD, EE etc. from 5 different threads then you will not get a stream that contains

ABCCDDBAEE

but you could get any combination of message order.

If your threads are sending messages with multiple send calls, such that three sends, A1 A2 A3 from thread 1 forms a single application level message. Then all bets are off as you could end up with

A1B1A2A3B2B3 etc. in the TCP stream.

If you need to do the later you will either need a lock of some kind around ALL of the WSASend calls so that you can group the multiple sends into a single application level atomic send or you should use multiple WSABUFs for a single WSASend call to gather together the multiple writes into a single call.

Are parallel calls to send/recv on the same socket valid?

POSIX defines send/recv as atomic operations, so assuming you're talking about POSIX send/recv then yes, you can call them simultaneously from multiple threads and things will work.

This doesn't necessarily mean that they'll be executed in parallel -- in the case of multiple sends, the second will likely block until the first completes. You probably won't notice this much, as a send completes once its put its data into the socket buffer.

If you're using SOCK_STREAM sockets, trying to do things a parallel is less likely to be useful as send/recv might send or receive only part of a message, which means things could get split up.

Blocking send/recv on SOCK_STREAM sockets only block until they send or recv at least 1 byte, so the difference between blocking and non-blocking is not useful.

multiple threads doing poll() or select() on a single socket or pipe

Interesting question ... I read through the current POSIX and did not find a specific answer, i.e., no specification about concurrent invocations.
So I'll explain why I think the standard means all will wake up.

The relevant part of the text for select / pselect is:

Upon successful completion, the pselect() or select() function shall modify the objects
pointed to by the readfds, writefds, and errorfds arguments to indicate which file
descriptors are ready for reading, ready for writing, or have an error condition pending,
respectively, [...]

and later

A descriptor shall be considered ready for reading when a call to an input function with
O_NONBLOCK clear would not block, whether or not the function would transfer data
successfully. (The function might return data, an end-of-file indication, or an error
other than one indicating that it is blocked, and in each of these cases the descriptor
shall be considered ready for reading.)

In short (the reading case only), we can understand this as:

select does not block this means that the next call to an input function with O_NONBLOCK would not return an error with errno==EWOULDBLOCK. [Note that the "next" is my interpretation of the above.]

If one admits to this interpretation then two concurrent select calls could both return the same FD as readable. In fact even if they are not concurrent, but a first thread calls select with some FD being readable and later e.g., read, a second thread calling select between the two could return the FD as readable for the second thread.

Now the relevant part for the "waking up" part of the question is this:

If none of the selected descriptors are ready for the requested operation, the pselect()
or select() function shall block until at least one of the requested operations becomes
ready, until the timeout occurs, or until interrupted by a signal.

Here clearly the above interpretation suggests that concurrently waiting calls will all return.



Related Topics



Leave a reply



Submit