How to Control the Source Ip Address of a Zeromq Packet on a Machine With Multiple Ips

How to control the source IP address of a ZeroMQ packet on a machine with multiple IPs?

When trying to .connect() to a remote, I found the answer in the protocol documentation, put the source ip before a semicolon in the connect string:

rc = zmq_connect(socket, "tcp://192.168.1.17:5555;192.168.1.1:5555")

In Python, this looks like:

socket = zmq.Context().socket(zmq.SUB)
socket.connect('tcp://192.168.1.17:5555;192.168.1.1:5555')

How do I implement multiple sockets with ZeroMQ?

Do I also need multiple contexts?

No, you need not.

How is this done?

gSock1 = zmq_socket( gCTX, ZMQ_REQ ); // 1st REQ-uester
gSock2 = zmq_socket( gCTX, ZMQ_REQ ); // 2nd
gSock3 = zmq_socket( gCTX, ZMQ_PUB ); // 1st PUB-lisher
gSock4 = zmq_socket( gCTX, ZMQ_PUB ); // 1st PUB-lisher

As simple as assigning as many instances of Socket()-class ( or zmq_socket() calls ) as needed. The default Context()-instance, the main message-passing processing engine, may remain "shared" or one may increase it's number of IO-threads, if needed and/or fine tune it's other configuration details as needed or even split the processing workloads among several Context()-instances if needed.

Where does zmq_send() send with multiple connections?

Q : "Where does zmq_send() send with multiple connections?"

It does send messages depending on what of the actual Archetype is used.

PUB/SUB sends each message to all peers that are positively subscribed to a Topic ( left-side, binary matching of the payload as-string ), whereas some internal details in pre-v3 releases actually physically moved any message to all peers (and there the SUB-side Context()-instances, kind of ALAP, performed the Topic-filtering ), not so since v3+.

PUSH/PULL does not have this "promise", so better try to imagine the PUSH-side to be a job-queue commander, and each PULL-side worker receives ( in the round-robin manner here ) a next task from the job-queue, as the load-balancers do in common.

Each other of the built-in Archetypes - REQ/REP, XPUB/XSUB, DEALER/ROUTER, PAIR/PAIR, ... - have a similarly formulated "promised-behaviour", so ZeroMQ services can build on combining these trivial Archetypes into some more complicated & structured group-behaviours for messaging/signalling applications.

Q : "The reasonable assumption is, that it will be sent to all connected peers"

If it were this way, there would be lost the key property of the PUSH/PULL "promised-behaviour" and there would be no difference between this and a PUB/SUB Archetype ( which makes even less sense to develop two things that do the very same thing, doesn't it? )


In case one has never worked with ZeroMQ,
one may here enjoy to first look at "ZeroMQ Principles in less than Five Seconds"
before diving into further details



ZeroMQ: how to use multiple Publishers and a single Client, using C C11

Q : "how to use multiple Publishers and a single Client, using C < C11?"

So, the QNX-version was not explicitly stated, so let's work in general.

As noted in ZeroMQ Principles in less than Five Seconds, the single Client ( being of a SUB-Archetype ) may zmq_connect( ? ), however at a cost of managing some, unknown for me, way how all the other, current plus any future PUB-s were let to zmq_bind(), after which to let somehow let the SUB learn where to zmq_connect( ? ), so that to get some news from the newly bound PUB-peer.

So it would be a way smarter to make the single SUB-agent to perform a zmq_bind() and let any of the current or future PUB-s perform zmq_connect() as they come, directed to the single, static, known SUB's location ( this does not say, they cannot use any of the available transport-classes - one inproc://, another one tcp://, some ipc://, if QNX permits & system architecture requires to do so ( and, obviously, supposing the SUB-agent has exposed a properly configured AccessNode for receiving such connections ).

Next, your SUB-Client has to configure its subscription filtering topic-list: be it an order to "Do Receive EVERYTHING!" :

...
retCode = zmq_setsockopt( <aSubSocketINSTANCE>, ZMQ_SUBSCRIBE, "", 0 );
assert( retCode == 0 && "FAILED: at ZMQ_SUBSCRIBE order " );
...

Given this works, your next duty is to make the setup robust enough ( an explicit ZMQ_LINGER setting to 0, access-policies, security, scaled-resources, L2/L3-network protective measures, etc ).

And you are done to harness the ZeroMQ just fit right to your QNX-system design needs.

ZeroMQ - Emulating standard socket for multiple clients to one server

Q : Is there a way I can utilise ZeroMQ to give the same behaviour in this application as a standard socket?

ZeroMQ is a very smart and rather behaviour-oriented signaling / messaging-platform for distributed-systems, not a socket.

Given your intentions are to be worth for HPC-ecosystem, a solution postulated / directed to use some tool and a must to bend it as much as possible, so as it will become close to resemble a behaviour that is native, but for other tool, does not seem to be a typical HPC-grade approach.

HPC-code is typically very well-crafted so as to become computing-costs-efficient ( and bless the Boss, CFO & gov/mil-funding for all those, who are today permitted not to design the HPC-code for the ultimate performance and hardware-resources' use efficiency :o) ) - here, if one pays expenses on ZeroMQ instantiations, there seems no benefit to come from these non-zero costs of instantiations and getting "just"-a-socket-alike behaviour, at cost, has negative performance yield, without any adjustments in some future benefits from smart, cluster-wide ZeroMQ services ( be it an N+1 or N+M redundancy, low-latency smart inter-node cluster signaling, cryptography, cheap security-motivated white-listing, or anything that may represent any additional HPC-grade Project's benefit, that may justify the costs of the initial ZeroMQ instantiation ).


Defined archetype of ZMQ_STREAM may provide some tools, yet, ref. above

A socket of type ZMQ_STREAM is used to send and receive TCP data from a non-ØMQ peer, when using the tcp:// transport. A ZMQ_STREAM socket can act as client and/or server, sending and/or receiving TCP data asynchronously.


When receiving TCP data, a ZMQ_STREAM socket shall prepend a message part containing the identity of the originating peer to the message before passing it to the application. Messages received are fair-queued from among all connected peers.



When sending TCP data, a ZMQ_STREAM socket shall remove the first part of the message and use it to determine the identity of the peer the message shall be routed to, and unroutable messages shall cause an EHOSTUNREACH or EAGAIN error.



To open a connection to a server, use the zmq_connect call, and then fetch the socket identity using the ZMQ_IDENTITY zmq_getsockopt call.



To close a specific connection, send the identity frame followed by a zero-length message (see EXAMPLE section).



When a connection is made, a zero-length message will be received by the application. Similarly, when the peer disconnects (or the connection is lost), a zero-length message will be received by the application.



You must send one identity frame followed by one data frame. The ZMQ_SNDMORE flag is required for identity frames but is ignored on data frames.

ZMQ_STREAM Example:

void *ctx = zmq_ctx_new ();                          assert (ctx     && "Context Instantiation Failed..." );
void *socket = zmq_socket (ctx, ZMQ_STREAM); assert (socket && "socket Instantiation Failed..." );
int rc = zmq_bind (socket, "tcp://*:8080"); assert (rc == 0 && "socket.bind() Failed..." );
uint8_t id [256]; /* Data structure to hold the ZMQ_STREAM ID */
size_t id_size = 256;
uint8_t raw [256]; /* Data structure to hold the ZMQ_STREAM received data */
size_t raw_size = 256;
while (1) {
id_size = zmq_recv (socket, id, 256, 0); assert (id_size > 0 && "Get HTTP request; ID frame and then request; Failed..." )
do {
raw_size = zmq_recv (socket, raw, 256, 0); assert (raw_size >= 0 && "socket.recv() Failed..." );
} while (raw_size == 256);

char http_response [] = /* Prepares the response */
"HTTP/1.0 200 OK\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"Hello, World!";
zmq_send (socket, id, id_size, ZMQ_SNDMORE); /* Sends the ID frame followed by the response */
zmq_send (socket, http_response, strlen (http_response), 0);
zmq_send (socket, id, id_size, ZMQ_SNDMORE); /* Closes the connection by sending the ID frame followed by a zero response */
zmq_send (socket, 0, 0, 0);
}
zmq_close (socket);
zmq_ctx_destroy (ctx);

ZeroMQ zmq_getsockopt() can deliver a POSIX/SOCKET descriptor, for low-level tricks

The ZMQ_FD option shall retrieve the file descriptor associated with the specified socket. The returned file descriptor can be used to integrate the socket into an existing event loop; the ØMQ library shall signal any pending events on the socket in an edge-triggered fashion by making the file descriptor become ready for reading.



The ability to read from the returned file descriptor does not necessarily indicate that messages are available to be read from, or can be written to, the underlying socket; applications must retrieve the actual event state with a subsequent retrieval of the ZMQ_EVENTS option.



The returned file descriptor is also used internally by the zmq_send and zmq_recv functions. As the descriptor is edge triggered, applications must update the state of ZMQ_EVENTS after each invocation of zmq_send or zmq_recv.



To be more explicit: after calling zmq_send the socket may become readable (and vice versa) without triggering a read event on the file descriptor.



The returned file descriptor is intended for use with a poll or similar system call only. Applications must never attempt to read or write data to it directly, neither should they try to close it.



Option value type: int on POSIX systems, SOCKET on Windows


For more details on ZeroMQ tricks one may enjoy to read solutions, performance benchmarks, latency-shaving details and other problem-solving tricks that have already been discussed here.

Is there any way to tell where a ZeroMQ message came from?

No, there is no way to get the senders IP from the ZeroMq socket. That information is hidden within the implementation layers of ZeroMq. You have a couple of choices to handle solve this, one is to change the message being passed and simply add the senders IP to the message itself, another is to use Multi-Part messages.

From the ZeroMq zmq_send() Api docs (3.2.2):

A ØMQ message is composed of 1 or more message parts. Each message part is an independent zmq_msg_t in its own right. ØMQ ensures atomic delivery of messages: peers shall receive either all message parts of a message or none at all. The total number of message parts is unlimited except by available memory.

Multi-Part messages are actually atomic messages but separated into several logical messages. I.e. you receive all parts or no parts. If you cannot modify the original message, you can prepend the message (on the sender side) with the IP of the sender. The receiver can then extract the first part as the senders IP and the second part as the original, unmodified, message. It will be delivered as a single message but is logically separated into two discreet parts.

In your case, you could do something like this:

// Send a multi-part message consisting of sender IP plus another message
zmq_msg_send (&my_ip, my_socket, ZMQ_SNDMORE);
zmq_msg_send (&my_message, my_socket, 0);

For the receiver, see the documentation for zmq_msg_recv().

How to get ZeroMQ to .connect() to localhost using a particular source IP?

You can't send and receive on the same port on the same interface.

The best solution is to allow ZeroMQ to pick the port to send on:


socket = zmq.Context().socket(zmq.SUB)
socket.connect('tcp://100.100.100.100:*;192.168.1.1:5555')

Unfortunately, the 4.2.2 release of ZeroMQ doesn't support this, though an upcoming release should. For now, the only solution is to hardcode a different port for the sending and receiving address:


socket = zmq.Context().socket(zmq.SUB)
socket.connect('tcp://100.100.100.100:6666;192.168.1.1:5555')

How to make a 'shared volatile variable' with ZeroMQ?

Q : How to make a 'shared volatile variable' ( s.t. most-recent-value-always-wins ) with ZeroMQ?

In case your application-level logic is happy with the PUB/SUB - one PUB-lishes, by .send()-ing messages, others SUB-scribe to theirs respective topic-of-choice, so as to start .recv()-ing Scalable Formal Communications Pattern archetype, we may fine-tune the configuration so as it meets all your requirements, expressed above ( s.t. most-recent-value-always-wins )

In case one has never worked with ZeroMQ,

one may here enjoy to first look at "ZeroMQ Principles in less than Five Seconds"
before diving into further details


The proper configuration step :

The trick is to use .setsockopt( ZMQ_CONFLATE, 1 ) method to just "switch-ON" this very kind of behaviour, managed by the Context()-engine instance(s) silently to the user, right "inside" the Queue-managers' policy.

If set, a socket shall keep only one message in its inbound/outbound queue, this message being the last message received/the last message to be sent. Ignores ZMQ_RCVHWM and ZMQ_SNDHWM options. Does not support multi-part messages, in particular, only one part of it is kept in the socket internal queue.

Applicable socket types ZMQ_PULL, ZMQ_PUSH, ZMQ_SUB, ZMQ_PUB, ZMQ_DEALER


It was that easy !

All the best with mastering the art of Zen-of-Zero.



Related Topics



Leave a reply



Submit