Using Zeromq Together with Boost::Asio

Using ZeroMQ together with Boost::ASIO

After reading the documentation here and here, specifically this paragraph

ZMQ_FD: Retrieve file descriptor associated with the socket The ZMQ_FD
option shall retrieve the file descriptor associated with the
specified socket. The returned file descriptor can be used to
integrate the socket into an existing event loop; the ØMQ library
shall signal any pending events on the socket in an edge-triggered
fashion by making the file descriptor become ready for reading.

I think you can use null_buffers for every zmq_pollitem_t and defer the event loop to an io_service, completely bypassing zmq_poll() altogether. There appear to be some caveats in the aforementioned documentation however, notably

The ability to read from the returned file descriptor does not
necessarily indicate that messages are available to be read from, or
can be written to, the underlying socket; applications must retrieve
the actual event state with a subsequent retrieval of the ZMQ_EVENTS
option.

So when the handler for one of your zmq sockets is fired, you'll have to do a little more work before handling the event I think. Uncompiled pseudo-code is below

const int fd = getZmqDescriptorSomehow();
boost::asio::posix::stream_descriptor socket( _io_service, fd );
socket->async_read_some(
boost::asio::null_buffers(),
[=](const boost::system::error_code& error)
{
if (!error) {
// handle data ready to be read
}
}
);

note you don't have to use a lambda here, boost::bind to a member function would be sufficient.

Integrating Boost Asio with ZeroMQ, Bad File Descriptor?

I haven't looked at the linked library, but this fragment

sock_.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
sock_.close();
zmq_close (zmq_sock_);

looks suspicious as sock_.close() is meddling with a socket that wasn't opened by it. I'd suggest it makes a lot more sense to release the socket on the asio side, instead of closing it, so that ZMQ can continue having the responsibility over creation/destruction.

sock_.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
sock_.release();
zmq_close (zmq_sock_);

Is ZeroMQ slower than boost asio?

Data Rate

You're attempting to move 800 MByte/second. What sort of connection is this? For a tcp:// transport-class it'd have to something pretty rapid, e.g. 100 Gbit/s Ethernet, which is pretty exotic.

So I'm presuming that it's an ipc:// transport-class connection. In which case you can get an improvement, using ZeroMQ zerocopy functions, which saves copying the data repeatedly.

With a normal transfer, you have to copy data into a zmq message, that has to be copied into an ipc pipe, copied out again, and copied back into a new zmq message at the receiving end. All that copying requires 4 x 800 = 2.4 GByte/sec memory bandwidth which, by the time cache conflicts have come into play, is an appreciable percentage of the total memory bandwidth of a typical PC system. Using zerocopy should cut that in half.

Alternative to Zero Copy - Zero Transfer

If you are using ipc://, then consider not sending data through the sockets, but sending references to the data through the sockets.

I have previously blended use of zmq and a semaphore locked C++ stl::queue, using zmq simply for it's pattern ( PUSH/PULL in my case ), the stl::queue to carry shared pointers to data, and leave the data still. The sender locks the queue, puts a shared pointer into it, and then sends a simple message ( e.g. "1" ) through a zmq socket. The recipient reads the "1" and uses that as a cue to lock the queue and pull a shared pointer off it. Thus a shared pointer to data has been transferred from one thread to another in a ZMQ pattern via a stl::queue, but the data itself has stayed still. All I've done is pass ownership of the data between threads. It works so long as the shared pointer that the send has goes out of scope immediately after sending and is not used by the sender to modify or access the data.

PUSH/PULL is not too bad to deal with - each message goes to only one recipient. It would take more effort to make such a blend with PUB/SUB, and received messages would have to be treated as read-only because each recipient would have a shared pointer to the same block of data as everyone else.

Message Size

I've not idea how big a chunk zmqtp transfers at a time, but I'd guess that it's relatively efficient in terms of protocol:data ratio.

Network library in C++

ZeroMQ works just fine integrated with Boost.Asio if you chose to go that route. That said, I'd suggest you implement your client and server using Boost.Asio if that's what you're familiar with. It's a fantastically written library, well documented, and has an active user community (that's us) if you have questions.

In my experience most developers who tend to shy away from Asio are unfamiliar with non-blocking I/O and prefer to silo their designs with the thread-per-connection methodology that does not scale beyond trivial examples. Point your colleagues at the Asio examples, send them to StackOverflow so they can read questions in the boost-asio tag. If they're still not convinced, maybe show them the TR2 networking library proposal based on Asio, it might end up in a standard someday.

Keep boost asio io_service

There is never a copy of io_service (or the more recent io_context that replaces the deprecated io_service). That's because it's not copyable.

If you don't maintain the lifetime while async operations have not completed, the behaviour is indeed undefined. However, it's not too hard to make sure no async operations/completions happen by stopping the service forcibly gracefully awaiting for it to complete any pending operations.

I suggest the latter approach for production quality code.

Hide blocking operation with coroutine

I am not sure which zeromq wrapper library you are using so here is an example based on the core libzmq lib.

Use zmq_poll that way you can define the timeout and still do a blocking read.

// define one socket (could define multiple), set event on ZMQ_POLLIN.
zmq_pollitem_t items[] = {{socket1, 0, ZMQ_POLLIN, 0}};

while(1)
{
// here you are polling the socket(s) defined in items with a timeout of 10ms
int rc = zmq_poll(items, 1, 10);

// Check which item (socket) had the event
if(items[0].revents & ZMQ_POLLIN)
{
// There is data available on socket1
// So call recv here on socket1

}

// do other stuff here
}

Alternatively if you only want to be notified on callbacks you could use

  • zloop from czmq
  • azmq zeromq for asio


Related Topics



Leave a reply



Submit