jeromq icon indicating copy to clipboard operation
jeromq copied to clipboard

Unable to receive messages when binding subscriber socket

Open jusw85 opened this issue 10 years ago • 12 comments

I'm trying to bind a subscriber socket, and running the following test case. However, the code blocks on recvStr():

    ZContext ctx = new ZContext(1);

    ZMQ.Socket rcvr = ctx.createSocket(ZMQ.SUB);
    rcvr.bind("inproc://control");
    rcvr.subscribe("".getBytes());

    ZMQ.Socket sndr = ctx.createSocket(ZMQ.PUB);
    sndr.connect("inproc://control");

    Thread.sleep(500);
    sndr.send("msg");
    String msg = rcvr.recvStr(); // blocks
    System.out.println(msg);

    ctx.destroySocket(rcvr);
    ctx.destroySocket(sndr);
    ctx.close();

If I bind the publisher socket, I dont have this issue:

    ZContext ctx = new ZContext(1);

    ZMQ.Socket sndr = ctx.createSocket(ZMQ.PUB);
    sndr.bind("inproc://control");

    ZMQ.Socket rcvr = ctx.createSocket(ZMQ.SUB);
    rcvr.connect("inproc://control");
    rcvr.subscribe("".getBytes());

    Thread.sleep(500);
    sndr.send("msg");
    String msg = rcvr.recvStr();
    System.out.println(msg); // prints "msg"

    ctx.destroySocket(rcvr);
    ctx.destroySocket(sndr);
    ctx.close();

Is there any way for me to bind the subscriber socket?

jusw85 avatar Sep 09 '15 09:09 jusw85

Do you have any special use case to bind on the SUB socket? It doesn't seems to be very common. PUB manages subscribers when SUB connects to it. From the PUB perspective, there's no subscribers to send message as no one connected to PUB, so it discards messages.

miniway avatar Sep 12 '15 01:09 miniway

My understanding of bind is that it's used on stable parts of the messaging architecture. In my use case, I have one static subscriber that listens to a fixed endpoint, and a dynamic (potentially growing) number of publishers. I use the PUB SUB pattern as the subscriber needs to pickup all messages sent by any publisher.

From http://zeromq.org/area:faq When should I use bind and when connect?

As a very general advice: use bind on the most stable points in your architecture and connect from the more volatile endpoints.

jusw85 avatar Sep 12 '15 02:09 jusw85

That's generally right, but PUB binds and SUB connects to the PUB. Otherwise messages might not be delivered. As http://zguide.zeromq.org/page:all#Messaging-Patterns comments shortly, some socket types works only within a limitation.

In your case, you might need a secondary bind with another socket type like DEALER. When a dynamic PUB appears, it connects to the secondary first with another socket and sends its endpoint, then main SUB connects to the PUB with the endpoint. There're similar patterns at the user guide which exchanges control messages.

miniway avatar Sep 12 '15 02:09 miniway

Thanks, guess I'll find a workaround with an intermediary instead. Thanks!

jusw85 avatar Sep 12 '15 05:09 jusw85

Should be:

    ZMQ.Socket rcvr = ctx.createSocket(ZMQ.SUB);
    rcvr.subscribe("".getBytes());
    rcvr.bind("inproc://control");

right? Your subscribe is in the wrong order, I think.

sjohnr avatar Oct 26 '15 02:10 sjohnr

JZMQ has absolutely identical API, but it allows binding a SUB socket and receive messages from connected publishing sockets, as soon as I swapped to JZMQ, this worked:

$ sbt console
[info] Starting scala interpreter...

scala> import org.zeromq.ZMQ
import org.zeromq.ZMQ

scala> import org.zeromq.ZMQ._
import org.zeromq.ZMQ._

scala> val ctx = ZMQ.context(1)
ctx: org.zeromq.ZMQ.Context = org.zeromq.ZMQ$Context@4e588bb3

scala> val sub = ctx.socket(ZMQ.SUB)
sub: org.zeromq.ZMQ.Socket = org.zeromq.ZMQ$Socket@34f5e96c

scala> sub.bind("tcp://127.0.0.1:5555")

scala> val pub = ctx.socket(ZMQ.PUB)
pub: org.zeromq.ZMQ.Socket = org.zeromq.ZMQ$Socket@22b66894

scala> pub.connect("tcp://127.0.0.1:5555")

scala> sub.subscribe(Array[Byte]())

scala> pub.send("abc")
res3: Boolean = true

scala> sub.recv()
res4: Array[Byte] = Array(97, 98, 99)

Means this behaviour isn't the design of ZMQ. Nontheless, JNI bindings have their own problems, JZMQ doesn't even build on my system, I have to build my project on Ubuntu. So I'd be happy to see this fixed in jeromq.

utgarda avatar Jan 07 '16 17:01 utgarda

So it seems like the big question here is, should it be possible to bind a SUB socket and connect publishers in JeroMQ like it is in JZMQ?

The ZeroMQ guide seems to suggest that the way to do this is with a secondary bind as described by @miniway .

daveyarwood avatar Sep 30 '16 20:09 daveyarwood

Just for reference, I ran into this issue in the project I'm working on. In this project, consumers (SUB) already advertise themselves in ZooKeeper for other reasons, so it'd be really convenient for it to also publish its ZMQ endpt there as well. The producer then finds consumers and creates the necessary PUB sockets to distribute its events. The subscription here would be used to make sure that messages are sharded consistently to the correct consumer, based on an ID within the data that matches the subscription.

megfigura avatar Aug 02 '18 19:08 megfigura

I'm still not convinced that we need to support binding the SUB and connecting the PUB, but I suppose it would be worth investigating why it doesn't work with JeroMQ and seeing if there is a way we can make it work. Especially given that it reportedly works with jzmq.

daveyarwood avatar Oct 28 '18 00:10 daveyarwood

If ever this gets fixed I would be interested to learn. From the age of the issue I conclude it's not a priority.

ledergec avatar Jun 29 '21 15:06 ledergec

@ledergec, I'm fairly certain this already works in JeroMQ as long as you subscribe and then bind, as I use this technique all the time for distributed logging and eventing.

I have a library using this technique, though it's written in Kotlin and using jzmq-api on top of jeromq, but you should get the idea:

sjohnr avatar Jun 30 '21 18:06 sjohnr

I have tried the following code:

` ZContext context = new ZContext();

var subscriber = context.createSocket(SocketType.SUB);
subscriber.subscribe("");
subscriber.bind("tcp://127.0.0.1:9877");
subscriber.setLinger(0);

Thread.sleep(100);

var publisher = context.createSocket(SocketType.PUB);
publisher.connect("tcp://127.0.0.1:9877");
publisher.setLinger(0);

for (int i = 0; i < 10; ++i) {
  Thread.sleep(100);
  publisher.send(new byte[] {(byte) 8, (byte) 2});
}

var result = subscriber.recv();
assertThat(result).isEqualTo(new byte[] {(byte) 8, (byte) 2});

publisher.close();
subscriber.close();
context.close();

` If you see anything which needs to be corrected, please let me know!

ledergec avatar Jul 01 '21 10:07 ledergec