jeromq icon indicating copy to clipboard operation
jeromq copied to clipboard

Exception while processing command leave IOThread in bad state (Not invoking inEvent to serve new command)

Open WeiliangLuo opened this issue 4 years ago • 6 comments

It is not obvious, but I recently find out that the IOThread has to drain all commands in its mailbox in order to be signaled again for new commands. This is because draining the messages off underlying YPipe will reset the YPipe.c to -1, which cause "flush()" to return false on sender, and only then the sender of new command will signal the IOThread to invoke inEvent().

However, an exception while IOThread is processing the command will stop it from draining its mailbox. As a result, the IOThread will be left in running state but never waked up again to serve inEvent even though there are new commands sent to it.

The exception we hit is the same as the one mentioned in https://github.com/zeromq/jeromq/issues/871. In our case, we understand that it is caused by a race condition between worker thread notifying the IOThread to ActivateWrite on a backed up socket and the heartbeat reconnecting to the remote end. This results the ActivateWrite command arrives to a new StreamEngine object just set up for the new connection, which is not fully initialized pending handshake. We learnt that it is not a good idea to enable heartbeat on Pull socket as the back pressure will trigger the heartbeat to reconnect frequently (given a reasonably low heartbeat timeout).

I see few different ways to handle the exception better, for example, let the IOThread die instead of fail silently, handle the exception inside the inEvent or even call iothread.inEvent without the signal. I am curious on your thoughts on this.

Thanks!

WeiliangLuo avatar Dec 03 '21 22:12 WeiliangLuo

Do you think this commit might help you ? https://github.com/zeromq/jeromq/commit/ece14303872354e689065ff635086ca355336d47

fbacchella avatar Dec 05 '21 11:12 fbacchella

One more question. Are you able to reproduce it ? A NPE is always a bug. So it should be resolver too.

fbacchella avatar Dec 05 '21 22:12 fbacchella

We are able to reproduce it. I'm not sure if it's easy to write a test for it yet, since it's a race condition, but I'm working on that this week.

ben-ng avatar Dec 06 '21 04:12 ben-ng

Meanwhile, a simple patch like:

@Override
public void inEvent()
{
    //  TODO: Do we want to limit number of commands I/O thread can
    //  process in a single go?

    while (true) {
        try {
            //  Get the next command. If there is none, exit.
            Command cmd = mailbox.recv(0);
            if (cmd == null) {
                break;
            }

            //  Process the command.
            cmd.process();
        } catch (RuntimeException e) {
            getCtx().getNotificationExceptionHandler().uncaughtException(Thread.currentThread(), e);
        }
    }
}

could make the job.

fbacchella avatar Dec 06 '21 07:12 fbacchella

One more question. Are you able to reproduce it ? A NPE is always a bug. So it should be resolver too.

Here is a "Unittest" that will reproduce the issue: The cause is essentially the race condition I mentioned above

In our case, we understand that it is caused by a race condition between worker thread notifying the IOThread to ActivateWrite on a backed up socket and the heartbeat reconnecting to the remote end. This results the ActivateWrite command arrives to a new StreamEngine object just set up for the new connection, which is not fully initialized pending handshake.

package org.zeromq;

import java.util.ArrayList;
import java.io.IOException;

import org.junit.Test;

import zmq.ZMQ;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZContext;

class Puller extends Thread
{
    Socket pullSock;
    int cnt = 0;

    public Puller(ZContext ctx, String host)
    {
        pullSock = ctx.createSocket(ZMQ.ZMQ_PULL);
        pullSock.setRcvHWM(1);
        pullSock.setHeartbeatIvl(5000);
        pullSock.setHeartbeatTimeout(1000);

        boolean brc = pullSock.connect(host);
    }

    public void run() {
        while (true) {
            // occasionally consume the message to trigger command being send to Session
            consume();
            try {
                Thread.sleep(6000);
            }
            catch (InterruptedException e) {
                System.out.println("Sleeping interrupted..." + e);
            }
        }
    }

    void consume() {
        String msg = pullSock.recvStr();
        cnt += 1;
        if (cnt % 10 == 1) {
            System.out.println("Consumed msg [" + cnt + "]. Message: " + msg);
        }
    }
}

public class TestHeartBeatRaceCondition
{
    @Test
    public void testHeartBeatRaceCondition() throws IOException
    {
        ZContext ctx = new ZContext();
        Socket push = ctx.createSocket(ZMQ.ZMQ_PUSH);
        push.setSndHWM(1);
        String host = "tcp://127.0.0.1:12345";
        boolean brc = push.bind(host);

        ArrayList<Puller> pullers = new ArrayList<Puller>();
        for (int i = 0; i < 50; ++i) {
            pullers.add(new Puller(ctx, host));
        }

        for (Puller puller : pullers) {
            puller.start();
        }

        while (true) {
            String content = "12345678ABCDEFGH12345678abcdefgh";
            //  Send the message.
            push.send(content);
        }

    }
}

WeiliangLuo avatar Dec 07 '21 06:12 WeiliangLuo

Indeed, I quickly get a

java.lang.AssertionError
	at zmq.io.StreamEngine.restartInput(StreamEngine.java:553)
	at zmq.io.SessionBase.writeActivated(SessionBase.java:276)
	at zmq.pipe.Pipe.processActivateWrite(Pipe.java:340)
	at zmq.ZObject.processCommand(ZObject.java:55)
	at zmq.Command.process(Command.java:77)
	at zmq.io.IOThread.inEvent(IOThread.java:81)
	at zmq.poll.Poller.run(Poller.java:276)
	at java.lang.Thread.run(Thread.java:748)

fbacchella avatar Dec 07 '21 08:12 fbacchella