jeromq icon indicating copy to clipboard operation
jeromq copied to clipboard

High water mark option not working for pub sub

Open sherlockedjd opened this issue 6 years ago • 7 comments

hi all can someone please explain with an example how to make high water mark option work for pub type sockets. I have tried but the publisher seems to publish all the messages even when HWM value is reached.can someone provide a complete example where messages are dropped when HWM is reached on pub socket. Thanks in advance. also note i have been setting this option before binding to my pub socket.

sherlockedjd avatar Aug 08 '18 12:08 sherlockedjd

Hi, did you have a look on the test PubSubHwmTest ?

It seems that PUB sockets seems to be not lossy, with option

ZMQ.setSocketOption(pub, ZMQ.ZMQ_XPUB_NODROP, true);

when sending messages without waiting, like:

ZMQ.send(pub, msg, ZMQ.ZMQ_DONTWAIT);

That would allow you to detect when HWM is reached.

If you could describe more precisely the usage you want of this, we may be of better help.

fredoboulo avatar Aug 12 '18 22:08 fredoboulo

Thanks fred for your response i shall definitely try this out. Basically i just want to explore the possibility of detecting when messages are being dropped on the publisher side because of high water mark being reached i.e. when subscriber is slow it starts dropping messages and consequently on the other hand the publisher side queue also gets full and eventually publisher starts to drop messages as mentioned in zmq docs for pub type sockets.

sherlockedjd avatar Aug 13 '18 06:08 sherlockedjd

I tried sending messages with dontwait flag but it appears not be working i always get the eagian flag set to 35 after sending any number of messages.also the hwm seems not to be working on pub side

heres my pub side code if any one wants to look at this and help me out

public static void main(String[] args) throws InterruptedException { // Prepare our context and publisher ZMQ.Context context = ZMQ.context(1); ZMQ.Socket publisher = context.socket(ZMQ.PUB); boolean receipt; int count=0; int error=0; publisher.setSndHWM(50); publisher.bind("tcp://localhost:5559"); String message="this is message from server1"; Scanner sc=new Scanner(System.in); String input="Yes"; Socket socket =new Socket(); System.out.println(ZMQ.getVersionString()); long starttime=System.currentTimeMillis(); for(int i=0;i<50;i++) //int i=0; //while(true) { message="this is message from server1"; message+="#"+i;

    //receipt=publisher.send(message);
    receipt=publisher.send(message,ZMQ.DONTWAIT);
    if(receipt)
    {
    	count++;
    }
    if(publisher.errno()==ZError.EAGAIN)
    {
    	System.out.println(publisher.errno());
    	error++;
    	
    }
    
    if(i==0||i==5000||i==10000||i==49000)
    {
     long timeinmillisecs=System.currentTimeMillis();
   	 long millis = timeinmillisecs % 1000;
   	 long second = (timeinmillisecs / 1000) % 60;
   	 long minute = (timeinmillisecs / (1000 * 60)) % 60;
   	 long hour = (timeinmillisecs/ (1000 * 60 * 60)) % 24+5;
   	 hour=hour%12;

   	    String time = String.format("%02d:%02d:%02d.%d", hour, minute, second, millis);
    	System.out.println("in sleep baby at time"+time);
       Thread.sleep(4000);
    input=sc.next();
    if(input=="Stop")
    {
    	break;
    }
  }
 // i++;
    System.out.println(message);
    }
    long stoptime=System.currentTimeMillis();
    long timetaken=stoptime-starttime;
    System.out.println("time elapsed in sending "+count+" msgs: "+timetaken+"failed"+error);
    //publisher.close ();
    //context.term ();
}

sherlockedjd avatar Aug 16 '18 14:08 sherlockedjd

You did not use Socket.setXpubNoDrop, in order to disable lossy behaviour.

As a general rule, errno should be checked only in case of error. That's the current behaviour that could certainly be improved by resetting the value before any API call, but checking it only in case of not being able to send message would help your code.

fredoboulo avatar Aug 16 '18 22:08 fredoboulo

@sherlockedjd Does the previous post help?

daveyarwood avatar Oct 28 '18 02:10 daveyarwood

@daveyarwood No it doesn't seem to work for the TCP protocol even with socket.setXpubNoDrop(true); HWM is not respected. May be the problem is because the TCP socket also has a buffer, and the messages are buffered there not in the jeroMQ queues.

petaryulianov avatar Dec 19 '22 12:12 petaryulianov

I'm also trying to reproduce HWM on an XPUB socket and wondering if there's a solution. socket.setXpubNoDrop doesn't seem to have the documented effect.

  val serverThread = thread {
    val zContext = ZContext()

    val serverSocket = zContext.createSocket(SocketType.XPUB)
    serverSocket.setXpubNoDrop(true)
    serverSocket.hwm = 1

    val bindResult = serverSocket.bind("tcp://*:9000")

    logger.v(LOG_TAG, "Server bind :$bindResult")

    var totalSent = 0

    while (System.currentTimeMillis() < endTime) {
      try {
        val sr1 = serverSocket.sendMore(topic)
        val sr2 = serverSocket.send(msgBody)
        if(!sr1 || !sr2) {
          logger.v(LOG_TAG, "Server send is too fast, HWM overflow. terminating server. ${serverSocket.errno()}")
          break
        }
        totalSent += 1
      } catch(ex: Exception) {
        logger.v(LOG_TAG, "got exception $ex")
      }
    }

    logger.v(LOG_TAG, "Total sent is $totalSent")
    zContext.destroy()
  }

If I let this run for some time I never see my log statements hit that I would expect to see false from my send call and EAGAIN from errno.

AdamWardVGP avatar Jan 31 '24 21:01 AdamWardVGP