jeromq
jeromq copied to clipboard
Error sending using a ROUTER-DEALER connection
Hello,
We are currently testing some communication solution using the JeroMQ framework (v0.4.3). The exchange is local to a computer and works perfectly except from time to time one message fails with (EHOSTUNREACH) :
org.zeromq.ZMQException: Errno 65 : at org.zeromq.ZMQ$Socket.mayRaise(ZMQ.java:2744) at org.zeromq.ZMQ$Socket.send(ZMQ.java:2592)
The communication is setup using :
- ROUTER socket : client side, binded with attributes :
- ZMQ_ROUTER_MANDATORY
- ZMQ_LINGER set to 2000 ms
- DEALER socket : server side, connected with attribute
- ZMQ_IDENTITY
- ZMQ_IMMEDIATE
- ZMQ_LINGER set to 2000 ms
Do you have any clue regarding how I could possibly debug such a situation ?
Thanks for your help,
This look like an identity update problem, as the problem happens only when sending a new message to the distant DEALER socket cross with receiving a new "ready" message from the same socket:
pseudocode: socket.send(identity, ZMQ_SENDMORE); /* we receive here inbetween a new "ready" msg for the same identity / socket.send(content, 0); / last send here fails with exception EHOSTUNREACH */
Hi,
Are you certain that the router is sending message only after receiving "ready" message (the one that is setting identity) from dealer?
EHOSTUNREACH seems to be set only when very first sendmore message (the identity message) is being sent from router and there is no such named pipe.
I made a unit test for what I guess is your usage. Please do tell if something is missing
@Test
public void testIssue528() throws Exception
{
int sent;
boolean rc;
final int port = Utils.findOpenPort();
final Ctx ctx = ZMQ.init(1);
assertThat(ctx, notNullValue());
final SocketBase router = ZMQ.socket(ctx, ZMQ.ZMQ_ROUTER);
assertThat(router, notNullValue());
rc = ZMQ.setSocketOption(router, ZMQ.ZMQ_ROUTER_MANDATORY, true);
assertThat(rc, is(true));
rc = ZMQ.setSocketOption(router, ZMQ.ZMQ_LINGER, 2000);
assertThat(rc, is(true));
rc = ZMQ.bind(router, "tcp://127.0.0.1:" + port);
assertThat(rc, is(true));
// Create dealer called "X" and connect it to our router
final SocketBase dealer = ZMQ.socket(ctx, ZMQ.ZMQ_DEALER);
assertThat(dealer, notNullValue());
rc = ZMQ.setSocketOption(dealer, ZMQ.ZMQ_IDENTITY, "X");
assertThat(rc, is(true));
rc = ZMQ.setSocketOption(dealer, ZMQ.ZMQ_IMMEDIATE, true);
assertThat(rc, is(true));
rc = ZMQ.setSocketOption(dealer, ZMQ.ZMQ_LINGER, 2000);
assertThat(rc, is(true));
rc = ZMQ.connect(dealer, "tcp://127.0.0.1:" + port);
assertThat(rc, is(true));
// Get message from dealer to know when connection is ready
sent = ZMQ.send(dealer, "Hello", 0);
assertThat(sent, is(5));
Msg msg = ZMQ.recv(router, 0);
assertThat(msg, notNullValue());
assertThat(msg.data()[0], is((byte) 'X'));
// Start sending a message to connected dealer
sent = ZMQ.send(router, "X", ZMQ.ZMQ_SNDMORE);
assertThat(sent, is(1));
// Dealer sends a message in between router message
sent = ZMQ.send(dealer, "World", 0);
assertThat(sent, is(5));
// Finish sending the message to connected dealer now
sent = ZMQ.send(router, "Bye", 0);
assertThat(sent, is(3));
msg = ZMQ.recv(dealer, 0);
assertThat(msg, notNullValue());
assertThat(msg.data(), is("Bye".getBytes(ZMQ.CHARSET)));
msg = ZMQ.recv(router, 0);
assertThat(msg, notNullValue());
assertThat(msg.data(), is("Hello".getBytes(ZMQ.CHARSET)));
msg = ZMQ.recv(router, 0);
assertThat(msg, notNullValue());
assertThat(msg.data()[0], is((byte) 'X'));
msg = ZMQ.recv(router, 0);
assertThat(msg, notNullValue());
assertThat(msg.data(), is("World".getBytes(ZMQ.CHARSET)));
// Clean up.
ZMQ.close(router);
ZMQ.close(dealer);
ZMQ.term(ctx);
}
Thanks for the test case. It helped find the right location of the problem. It looks like the routing table of out router socket was resetted by a poll() waiting
// ####
// #### before that our router socket _rtrSck has an outpipes.size()=1 (see router.java)
// ####
ZMQ.Poller poll_a = _context.poller(2);
poll_a.register(_rtrSck, ZMQ.Poller.POLLIN);
poll_a.register(_svcSck, ZMQ.Poller.POLLIN);
// Wait for message.
if(-1 == poll_a.poll(-1)) // timeout
{
// interrupted, retry
continue;
}
// ####
// #### after that our router socket _rtrSck has an outpipes.size()=0
// #### we cannot no longer use this socket for sending
// ####
Do you have a compilable example so we can investigate further ?
Blocked pending reply from @fmfranck .