jeromq icon indicating copy to clipboard operation
jeromq copied to clipboard

Setting the linger period for socket shutdown seams to not not work

Open simongregorebner opened this issue 11 years ago • 9 comments

Setting the linger period for socket shutdown seams to not work. When doing a

socket.setLinger(timeout); 
socket.close()

the socket.close() will not wait the linger period if there are still messages in memory before actually closing the socket. (Thats what I would expect according to http://api.zeromq.org/2-1:zmq-setsockopt)

You can reproduce the problem/error with the attached/following test code. If you run the code as is you will at least loose one message on the senders side (i.e. the code will not exit).

If you comment the socket.setRcvHWM(1); in the receiver (which in my point of view forces that some messages will stay on the sender side ?) or set it to 11 (which will cause the immediate transfer of the messages to the receivers memory ?) you will NOT loose any messages.

import java.util.concurrent.CountDownLatch;
import java.util.logging.Logger;

import org.zeromq.ZMQ;

public class PushPullTest {

    private static final Logger logger = Logger.getLogger(PushPullTest.class.getName());

    public static void main(String[] args) throws InterruptedException{

        CountDownLatch latch = new CountDownLatch(1);

        Thread sender = new Thread(new Sender(latch));
        Thread receiver = new Thread(new Receiver(latch));

        // Start sender before receiver and use latch to ensure that receiver has connected 
        // before sender is sending messages
        sender.start();
        logger.info("Sender started");
        receiver.start();
        logger.info("Receiver started");

        sender.join();
        logger.info("Sender terminated");
        receiver.join();
        logger.info("Receiver terminated");
    }


}
class Sender implements Runnable {

    private static final Logger logger = Logger.getLogger(Sender.class.getName());

    private CountDownLatch latch;

    public Sender(CountDownLatch latch){
        this.latch = latch;
    }

    @Override
    public void run() {
        int port = 9999;
        String address = "tcp://*:"+port;

        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket socket = context.socket(ZMQ.PUSH);

        // Socket options
        socket.setLinger(1000);

        socket.bind(address);

        // Ensure that receiver is "connected" before sending
        try {
            latch.await();
        } catch (InterruptedException e) {
        }

        for(int i=0;i<=10;i++){
            socket.send("data"+i);
            logger.info("send >> data"+i);
        }

        socket.close();
        context.close();
    }
}

class Receiver implements Runnable {

    private static final Logger logger = Logger.getLogger(Receiver.class.getName());

    private CountDownLatch latch;

    public Receiver(CountDownLatch latch){
        this.latch = latch;
    }

    @Override
    public void run() {
        int port = 9999;
        String address = "tcp://localhost:"+port;

        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket socket = context.socket(ZMQ.PULL);

        // Options Section
        socket.setRcvHWM(1);

        socket.connect(address);

        latch.countDown();

        int counter = 0;
        while(counter <= 10){
            socket.base().errno();
            String s = new String(socket.recv());
            logger.info("received >> "+s);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
            counter++;
        }

        socket.close();
        context.close();
    }
}

simongregorebner avatar Jan 13 '14 10:01 simongregorebner

Confirmed this is an issue. I've pretty much duplicated Simon's test case in investigating this issue myself - should have checked the JIT issues first.

geoffblandrbs avatar Jan 29 '14 12:01 geoffblandrbs

I hope someone to test same logic with C based libzmq. I guess this is not an issue of linger but handling TCP socket disconnect event handling. It might usually occur when the receiver buffer is full. The above example could work with hwm = 3

Here're my scenario,

Sender sent 11 messages successfully, Receiver TCP socket got 11 messages successfully, but it only 10 message processed (copied into internal queue one by one due to hwm = 1), in the meantime Receiver also get TCP disconnect event ( socket.read exception ). Even though the 11st message is not copied into internal queue, disconnect event handler abandons the 11st message.

So to speak, linger will guarantee sending message within the given timeout. But when receiver internal queue is full and TCP socket is closed, receiver abandons the last messages.

When closed, StreamEngine can not easily know all the bytes are transferred and when full internal queue would has room.

We could invent a RECEIVER_LINGER which delays abandoning the unhanded last messages.

miniway avatar Jan 29 '14 19:01 miniway

A RECEIVER_LINGER option would be fine with me.

simongregorebner avatar Jan 31 '14 07:01 simongregorebner

I've run into this same issue with python, and the developer was able to reproduce it with the C impl so it seems to be a core issue: https://github.com/zeromq/pyzmq/issues/832

thehesiod avatar Apr 14 '16 18:04 thehesiod

There was additional discussion in that pyzmq issue. It was ultimately closed under the conclusion that there is a bug in libzmq: https://github.com/zeromq/libzmq/issues/1922

In that issue, it was noted that this other issue seemed related: https://github.com/zeromq/libzmq/issues/1264

This test case was added for issue 1264: https://github.com/hitstergtd/libzmq/commit/724315a61141d5e1a68bd483c8a544c34598e016

Action needed for JeroMQ: a PR to add a test case either based on the one linked above, or based on @simongregorebner's code in his post at the top of this issue, or perhaps both.

Pinging @thehesiod just in case you may have gained any additional insight on this?

daveyarwood avatar Jan 19 '17 19:01 daveyarwood

@daveyarwood unfortunately not, no responses from the libzmq ppl. IIRC of interest is that it doesn't seem to occur on osx so the guess was that this is some sort of system buffer that libzmq isn't accounting for. I worked around this issue during socket close by fixing the size of the SNDBUF and flushing out enough data that any buffered content would theoretically get sent.

thehesiod avatar Jan 19 '17 19:01 thehesiod

JeroMQ does not use libzmq, but is a port of it. So, my thinking is that if this issue is fixed in libzmq, then we can see what the fix was and translate it into a fix in the JeroMQ codebase.

daveyarwood avatar Jan 19 '17 20:01 daveyarwood

I took a look to see if there have been any updates in https://github.com/zeromq/libzmq/issues/1922. It looks like @thehesiod was able to produce a reproducible test case in C, using native libzmq, but so far, the cause has not been identified.

daveyarwood avatar Oct 28 '18 00:10 daveyarwood

I checked back for updates. In July 2019, it was reported to still be unresolved.

daveyarwood avatar Feb 01 '20 01:02 daveyarwood