libzmq
libzmq copied to clipboard
zmq documentation seems to indicate multipart required for PUB/SUB, but not required in practice?
Hi team,
Looking at the ZMQ documentation for PUB/SUB and XPUB/XSUB, I see the following example
(source)
Those are the official guidelines for the publish/subscribe pattern. The way I interpreted that, one cannot simply send a single message if one's subscribers are filtering by topic.
However, sending topic concat'd with payload as a single message (not multipart!) seems to still work with subscribers with filter lists.
i.e., docs seem to suggest
zmq_socket.send_multipart(topic, payload);
but the following
zmq_socket.send(topic + payload)
also works without an issue. My question was if there are any implications I'm not seeing or understanding as a result of using ZMQ this way?
Looking at the source code for ZMQ, I see the following:
Here's the logic for how receiving works - it receives a message from the pipe, "matches" it against subscriptions, and if there is no match, it just calls recv() for any additional messages (for multipart). So there's no avoiding the recv by splitting up the topic and payload into a multipart message
int zmq::xsub_t::xrecv (msg_t *msg_)
{
// If there's already a message prepared by a previous call to zmq_poll,
// return it straight ahead.
if (_has_message) {
const int rc = msg_->move (_message);
errno_assert (rc == 0);
_has_message = false;
_more_recv = (msg_->flags () & msg_t::more) != 0;
return 0;
}
// TODO: This can result in infinite loop in the case of continuous
// stream of non-matching messages which breaks the non-blocking recv
// semantics.
while (true) {
// Get a message using fair queueing algorithm.
int rc = _fq.recv (msg_);
// If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0)
return -1;
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if (_more_recv || !options.filter || match (msg_)) {
_more_recv = (msg_->flags () & msg_t::more) != 0;
return 0;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while (msg_->flags () & msg_t::more) {
rc = _fq.recv (msg_);
errno_assert (rc == 0);
}
}
}
(source)
Additionally, here's how the match is actually done. Notably, it checks character by character, but aborts when it cannot find any matches.
bool zmq::trie_t::check (const unsigned char *data_, size_t size_) const
{
// This function is on critical path. It deliberately doesn't use
// recursion to get a bit better performance.
const trie_t *current = this;
while (true) {
// We've found a corresponding subscription!
if (current->_refcnt)
return true;
// We've checked all the data and haven't found matching subscription.
if (!size_)
return false;
// If there's no corresponding slot for the first character
// of the prefix, the message does not match.
const unsigned char c = *data_;
if (c < current->_min || c >= current->_min + current->_count)
return false;
// Move to the next character.
if (current->_count == 1)
current = current->_next.node;
else {
current = current->_next.table[c - current->_min];
if (!current)
return false;
}
data_++;
size_--;
}
}
(source)
Is my understanding correct that multipart isn't actually required like the documentation seems to suggest? Are there any drawbacks I'm not seeing? It seems like no matter whether you concat your topic and payload or not, the receiver still has to pop every message from the pipe anyway, so there's no ostensible benefit for using multipart.
Thanks in advance for your time