jeromq icon indicating copy to clipboard operation
jeromq copied to clipboard

Error sending using a ROUTER-DEALER connection

Open fmfranck opened this issue 7 years ago • 5 comments

Hello,

We are currently testing some communication solution using the JeroMQ framework (v0.4.3). The exchange is local to a computer and works perfectly except from time to time one message fails with (EHOSTUNREACH) :

org.zeromq.ZMQException: Errno 65 : at org.zeromq.ZMQ$Socket.mayRaise(ZMQ.java:2744) at org.zeromq.ZMQ$Socket.send(ZMQ.java:2592)

The communication is setup using :

  • ROUTER socket : client side, binded with attributes :
    • ZMQ_ROUTER_MANDATORY
    • ZMQ_LINGER set to 2000 ms
  • DEALER socket : server side, connected with attribute
    • ZMQ_IDENTITY
    • ZMQ_IMMEDIATE
    • ZMQ_LINGER set to 2000 ms

Do you have any clue regarding how I could possibly debug such a situation ?

Thanks for your help,

fmfranck avatar Mar 07 '18 11:03 fmfranck

This look like an identity update problem, as the problem happens only when sending a new message to the distant DEALER socket cross with receiving a new "ready" message from the same socket:

pseudocode: socket.send(identity, ZMQ_SENDMORE); /* we receive here inbetween a new "ready" msg for the same identity / socket.send(content, 0); / last send here fails with exception EHOSTUNREACH */

fmfranck avatar Mar 09 '18 08:03 fmfranck

Hi,

Are you certain that the router is sending message only after receiving "ready" message (the one that is setting identity) from dealer?

EHOSTUNREACH seems to be set only when very first sendmore message (the identity message) is being sent from router and there is no such named pipe.

I made a unit test for what I guess is your usage. Please do tell if something is missing

    @Test
    public void testIssue528() throws Exception
    {
        int sent;
        boolean rc;

        final int port = Utils.findOpenPort();

        final Ctx ctx = ZMQ.init(1);
        assertThat(ctx, notNullValue());

        final SocketBase router = ZMQ.socket(ctx, ZMQ.ZMQ_ROUTER);
        assertThat(router, notNullValue());

        rc = ZMQ.setSocketOption(router, ZMQ.ZMQ_ROUTER_MANDATORY, true);
        assertThat(rc, is(true));

        rc = ZMQ.setSocketOption(router, ZMQ.ZMQ_LINGER, 2000);
        assertThat(rc, is(true));

        rc = ZMQ.bind(router, "tcp://127.0.0.1:" + port);
        assertThat(rc, is(true));

        //  Create dealer called "X" and connect it to our router
        final SocketBase dealer = ZMQ.socket(ctx, ZMQ.ZMQ_DEALER);
        assertThat(dealer, notNullValue());

        rc = ZMQ.setSocketOption(dealer, ZMQ.ZMQ_IDENTITY, "X");
        assertThat(rc, is(true));

        rc = ZMQ.setSocketOption(dealer, ZMQ.ZMQ_IMMEDIATE, true);
        assertThat(rc, is(true));

        rc = ZMQ.setSocketOption(dealer, ZMQ.ZMQ_LINGER, 2000);
        assertThat(rc, is(true));

        rc = ZMQ.connect(dealer, "tcp://127.0.0.1:" + port);
        assertThat(rc, is(true));

        //  Get message from dealer to know when connection is ready
        sent = ZMQ.send(dealer, "Hello", 0);
        assertThat(sent, is(5));

        Msg msg = ZMQ.recv(router, 0);
        assertThat(msg, notNullValue());
        assertThat(msg.data()[0], is((byte) 'X'));

        //  Start sending a message to connected dealer
        sent = ZMQ.send(router, "X", ZMQ.ZMQ_SNDMORE);
        assertThat(sent, is(1));

        //  Dealer sends a message in between router message
        sent = ZMQ.send(dealer, "World", 0);
        assertThat(sent, is(5));

        //  Finish sending the message to connected dealer now
        sent = ZMQ.send(router, "Bye", 0);
        assertThat(sent, is(3));

        msg = ZMQ.recv(dealer, 0);
        assertThat(msg, notNullValue());
        assertThat(msg.data(), is("Bye".getBytes(ZMQ.CHARSET)));

        msg = ZMQ.recv(router, 0);
        assertThat(msg, notNullValue());
        assertThat(msg.data(), is("Hello".getBytes(ZMQ.CHARSET)));

        msg = ZMQ.recv(router, 0);
        assertThat(msg, notNullValue());
        assertThat(msg.data()[0], is((byte) 'X'));

        msg = ZMQ.recv(router, 0);
        assertThat(msg, notNullValue());
        assertThat(msg.data(), is("World".getBytes(ZMQ.CHARSET)));

        //  Clean up.
        ZMQ.close(router);
        ZMQ.close(dealer);
        ZMQ.term(ctx);
    }

fredoboulo avatar Mar 10 '18 08:03 fredoboulo

Thanks for the test case. It helped find the right location of the problem. It looks like the routing table of out router socket was resetted by a poll() waiting

        // ####         
        // #### before that our router socket _rtrSck has an outpipes.size()=1 (see router.java)
        // #### 

        ZMQ.Poller poll_a = _context.poller(2);
        poll_a.register(_rtrSck, ZMQ.Poller.POLLIN);
        poll_a.register(_svcSck, ZMQ.Poller.POLLIN);

        // Wait for message.
        if(-1 == poll_a.poll(-1))    // timeout
        {
           // interrupted, retry
            continue;
        }

        // ####         
        // #### after that our router socket _rtrSck has an outpipes.size()=0
        // #### we cannot no longer use this socket for sending
        // ####         

fmfranck avatar Mar 12 '18 18:03 fmfranck

Do you have a compilable example so we can investigate further ?

fredoboulo avatar Mar 12 '18 22:03 fredoboulo

Blocked pending reply from @fmfranck .

daveyarwood avatar Oct 28 '18 02:10 daveyarwood