jeromq
jeromq copied to clipboard
High water mark option not working for pub sub
hi all can someone please explain with an example how to make high water mark option work for pub type sockets. I have tried but the publisher seems to publish all the messages even when HWM value is reached.can someone provide a complete example where messages are dropped when HWM is reached on pub socket. Thanks in advance. also note i have been setting this option before binding to my pub socket.
Hi, did you have a look on the test PubSubHwmTest ?
It seems that PUB sockets seems to be not lossy, with option
ZMQ.setSocketOption(pub, ZMQ.ZMQ_XPUB_NODROP, true);
when sending messages without waiting, like:
ZMQ.send(pub, msg, ZMQ.ZMQ_DONTWAIT);
That would allow you to detect when HWM is reached.
If you could describe more precisely the usage you want of this, we may be of better help.
Thanks fred for your response i shall definitely try this out. Basically i just want to explore the possibility of detecting when messages are being dropped on the publisher side because of high water mark being reached i.e. when subscriber is slow it starts dropping messages and consequently on the other hand the publisher side queue also gets full and eventually publisher starts to drop messages as mentioned in zmq docs for pub type sockets.
I tried sending messages with dontwait flag but it appears not be working i always get the eagian flag set to 35 after sending any number of messages.also the hwm seems not to be working on pub side
heres my pub side code if any one wants to look at this and help me out
public static void main(String[] args) throws InterruptedException { // Prepare our context and publisher ZMQ.Context context = ZMQ.context(1); ZMQ.Socket publisher = context.socket(ZMQ.PUB); boolean receipt; int count=0; int error=0; publisher.setSndHWM(50); publisher.bind("tcp://localhost:5559"); String message="this is message from server1"; Scanner sc=new Scanner(System.in); String input="Yes"; Socket socket =new Socket(); System.out.println(ZMQ.getVersionString()); long starttime=System.currentTimeMillis(); for(int i=0;i<50;i++) //int i=0; //while(true) { message="this is message from server1"; message+="#"+i;
//receipt=publisher.send(message);
receipt=publisher.send(message,ZMQ.DONTWAIT);
if(receipt)
{
count++;
}
if(publisher.errno()==ZError.EAGAIN)
{
System.out.println(publisher.errno());
error++;
}
if(i==0||i==5000||i==10000||i==49000)
{
long timeinmillisecs=System.currentTimeMillis();
long millis = timeinmillisecs % 1000;
long second = (timeinmillisecs / 1000) % 60;
long minute = (timeinmillisecs / (1000 * 60)) % 60;
long hour = (timeinmillisecs/ (1000 * 60 * 60)) % 24+5;
hour=hour%12;
String time = String.format("%02d:%02d:%02d.%d", hour, minute, second, millis);
System.out.println("in sleep baby at time"+time);
Thread.sleep(4000);
input=sc.next();
if(input=="Stop")
{
break;
}
}
// i++;
System.out.println(message);
}
long stoptime=System.currentTimeMillis();
long timetaken=stoptime-starttime;
System.out.println("time elapsed in sending "+count+" msgs: "+timetaken+"failed"+error);
//publisher.close ();
//context.term ();
}
You did not use Socket.setXpubNoDrop, in order to disable lossy behaviour.
As a general rule, errno should be checked only in case of error. That's the current behaviour that could certainly be improved by resetting the value before any API call, but checking it only in case of not being able to send message would help your code.
@sherlockedjd Does the previous post help?
@daveyarwood No it doesn't seem to work for the TCP protocol even with socket.setXpubNoDrop(true);
HWM is not respected. May be the problem is because the TCP socket also has a buffer, and the messages are buffered there not in the jeroMQ queues.
I'm also trying to reproduce HWM on an XPUB socket and wondering if there's a solution. socket.setXpubNoDrop
doesn't seem to have the documented effect.
val serverThread = thread {
val zContext = ZContext()
val serverSocket = zContext.createSocket(SocketType.XPUB)
serverSocket.setXpubNoDrop(true)
serverSocket.hwm = 1
val bindResult = serverSocket.bind("tcp://*:9000")
logger.v(LOG_TAG, "Server bind :$bindResult")
var totalSent = 0
while (System.currentTimeMillis() < endTime) {
try {
val sr1 = serverSocket.sendMore(topic)
val sr2 = serverSocket.send(msgBody)
if(!sr1 || !sr2) {
logger.v(LOG_TAG, "Server send is too fast, HWM overflow. terminating server. ${serverSocket.errno()}")
break
}
totalSent += 1
} catch(ex: Exception) {
logger.v(LOG_TAG, "got exception $ex")
}
}
logger.v(LOG_TAG, "Total sent is $totalSent")
zContext.destroy()
}
If I let this run for some time I never see my log statements hit that I would expect to see false
from my send call and EAGAIN
from errno.