high water marks not unbounded for PUB/SUB
Issue description
When setting HWM to 0 in PUB/SUB communications, limits seem to still apply and cause messages to be dropped.
Environment
- libzmq version (commit hash if unreleased): 572eb008
- OS: macOS
Minimal test code / Steps to reproduce the issue
A sender and a receiver using both pub/sub and push/pull as a comparison.
Sender:
#include <czmq.h>
int size = 1024;
int number = 0;
char zmq_endpoint[1024] = "";
zsock_t *pub = NULL;
zsock_t *push = NULL;
int send_messages (zloop_t *loop, int timer_id, void *arg){
char *type = (char *)arg;
if (streq(type, "stop"))
return -1;
for (int i = 1; i <= number; i++){
printf("sending with %s %d/%d\n", type, i, number);
if (streq(type, "pub"))
zsock_send(pub, "i", i);
else
zsock_send(push, "i", i);
}
return 0;
}
int main(int argc, const char * argv[]) {
number = atoi(argv[3]);
pub = zsock_new_pub(argv[1]);
zsock_set_sndhwm(pub, 0);
zsock_set_rcvhwm(pub, 0);
push = zsock_new_push(argv[2]);
zsock_set_sndhwm(push, 0);
zsock_set_rcvhwm(push, 0);
zloop_t *loop = zloop_new();
zloop_timer(loop, 2000, 1, send_messages, "push");
zloop_timer(loop, 4000, 1, send_messages, "pub");
zloop_timer(loop, 6000, 1, send_messages, "stop");
zloop_start(loop);
zloop_destroy(&loop);
zsock_destroy (&pub);
zsock_destroy (&push);
return 0;
}
Receiver:
#include <czmq.h>
int size = 1024;
int number = 0;
int sub_msg_nb = 0;
int pull_msg_nb = 0;
char zmq_endpoint[1024] = "";
zsock_t *sub = NULL;
zsock_t *pull = NULL;
int zmq_reader (zloop_t *loop, zsock_t *reader, void *arg){
char *type = (char *)arg;
int value = 0;
zsock_recv(reader, "i", &value);
int *counter = NULL;
if (streq(type, "sub"))
counter = &sub_msg_nb;
else
counter = &pull_msg_nb;
++(*counter);
//printf("received %s value %d as #%d\n", type, value, *counter);
if (value != *counter){
printf("lost message - #%d value is %d (%d dropped messages)\n", *counter, value, value - *counter);
*counter = value; //catchup
}
return 0;
}
int main(int argc, const char * argv[]) {
number = atoi(argv[3]);
sub = zsock_new_sub(argv[1], "");
zsock_set_sndhwm(sub, 0);
zsock_set_rcvhwm(sub, 0);
pull = zsock_new_pull(argv[2]);
zsock_set_sndhwm(pull, 0);
zsock_set_rcvhwm(pull, 0);
zloop_t *loop = zloop_new();
zloop_reader(loop, sub, zmq_reader, "sub");
zloop_reader(loop, pull, zmq_reader, "pull");
zloop_reader_set_tolerant(loop, sub);
zloop_reader_set_tolerant(loop, pull);
zloop_start(loop);
zloop_destroy(&loop);
zsock_destroy (&sub);
zsock_destroy (&pull);
return 0;
}
Used on tcp and ipc transports this way (tested on same and two computers for TCP):
./receiver tcp://192.168.1.196:12345 tcp://192.168.1.196:12346 10000
./sender tcp://192.168.1.196:12345 tcp://192.168.1.196:12346 10000
./receiver ipc:///tmp/pub ipc:///tmp/push 10000
./sender ipc:///tmp/pub ipc:///tmp/push 10000
What's the actual result?
In all cases, push/pull communications are complete (all 10 000 messages received) and pub/sub communications lose some messages (it is not a constant number).
lost message - #3501 value is 3721 (220 dropped messages)
lost message - #8721 value is 8758 (37 dropped messages)
NB: this is NOT a late joiner problem because dropped messages are in the middle of the sequence, not in the beginning. We ensured this by running the receiver before the sender and waiting 2 seconds before sending messages.
NB: of course the problem appears only when sent messages are more than 1000, which is the default HWM value...
What's the expected result?
When using zsock_set_sndhwm and zsock_set_rcvhwm on all socket to set HWM to zero, one would expect all the messages to be delivered in PUB/SUB communications, just like it is done in PUSH/PULL.
Duplicate of #4394