libzmq icon indicating copy to clipboard operation
libzmq copied to clipboard

ZMQ_CONFLATE on PUB/SUB is broken when using filters

Open petke opened this issue 9 years ago • 8 comments

I have a realtime PUB feed. I use the ZMQ_CONFLATE setting to make sure clients never get old queued up messages.

I have a hundred SUB clients that use filter to get only the messages with a key they are interested in. With the setting ZMQ_CONFLATE the clients receive queue size is at most 1 message. That means even the most recent messages with keys the clients are subscribed to get throws away. Only one message remains in the queue, instead of one message for every key as one would like.

I tried to use the workaround to use one PUB/SUB socket per filter. That way every message key gets its own queue. But that can quickly use up thousands of sockets, which is probably not a good idea (windows FD_SETSIZE is set to only 64).

As a use case lets imagine a stock price pub feed. The clients are day traders using automated tools. They do not want to trade using old prices. Every millisecond counts. The filter are the id/name of the stocks they are trading on.

petke avatar Dec 30 '15 06:12 petke

The workaround I made was to create a client that manually drops old queued messages. Googling I found many people have asked for the same thing. I do think ZeroMQ should provide functionality for this common use case somehow.

The recommendation to use the suicidal snail for slow subscribers, and simply kill them, doesn't seem practical. What if we need to support clients where some are naturally slower than others. We might not have control over all clients. Some might live on slow computers, and some might be fast but only care to receive the messages once in a while. Old messages are of little interest for realtime messaging.

Examples are say a feed server sending out current share prices (used of trading), or a driver sending out current mouse cursor position (for rendering), or a FPS game server sending out current position of the players (for clients to keep in sync with). Old messages are of little or no use to such clients. Only the latest messages matter. The old messages are the ones we want to drop in case the client cant keep up.

Anyways here is my workaround. It seems a bit of a hack. Im hoping you clever people could come up with something better internally in the library.

http://stackoverflow.com/questions/34503252/howto-make-zeromq-pub-sub-drop-old-messages-instead-of-new-for-realtime-feeds/34563635#34563635

petke avatar Jan 02 '16 21:01 petke

+1 for this feature request (I would argue bug fix). Without it, ZMQ is seems totally inappropriate for real-time data streams, for all the reasons petke states.

valschmidt avatar Oct 09 '16 23:10 valschmidt

It is almost three years old issue, but could I try to do it?

m2kz avatar Sep 13 '18 19:09 m2kz

Okay, today I spent some time to learn something about codebase and contribution. Please someone to make sure that my idea of solving that problem is correct: there should be added a socket option, let's call it ZMQ_UNIQUE_MSG, that while added to PUB configuration via zmq_setsockopt() would keep only one message per one SUB filter. Is that correct way?

m2kz avatar Oct 20 '18 20:10 m2kz

@mkmodrzew i dont think so. What would be great as a opt to tell zmq that when the queue is full drop de oldest message on it and replace with the new that have just arrived.

ZMQ_CONFLATE seems great but for SUB with filters and for cases where it just take a bit longer to process message on the SUB will just make it lose message as @petke said.

samuelrohr avatar Mar 01 '19 13:03 samuelrohr

Is there any progress on this? No way to use Pub/Sub with filters and get non-stale messages? It seems like quite a problem to me.

themightyoarfish avatar Mar 11 '21 06:03 themightyoarfish

Again bringing this up. Any news here?

ahoereth avatar Mar 14 '22 15:03 ahoereth

A potential solution is the following. On the receiving end, set up a finite thread safe queue with the policy that the queue drops the head element when it is full and a push occurs (eg a ring buffer). The push/pop/peek should be nonblocking (push drops head and writes to tail if queue is full, pop and peek return empty element or otherwise signal that the queue is empty if it is empty). Set up two threads. The first one receives from ZMQ (possibly with filters in place) and pushes to your queue. The second one is your main processing thread that consumes from your finite queue.

Of course, this assumes that the first thread (that pops from ZMQ and pushes to your finite queue) runs fast enough to keep up with the ZMQ publisher.

ghost avatar Apr 10 '22 06:04 ghost