jeromq icon indicating copy to clipboard operation
jeromq copied to clipboard

New client cannot receive any data in PUB/SUB mode

Open 317787106 opened this issue 2 years ago • 1 comments

I open one zmq server and one client with PUB/SUB mode, it works well at frst. I open and close the client repeatly. Some hours later, when I open the new client, it cannot recevie any message any more, but the server is still send msg. Not any warn or error log occurs. I use the tool arthas (https://gitee.com/arthas/arthas) to debug the server : Use followong command:

watch zmq.socket.pubsub.Dist distribute -x 4

Get following output:

method=zmq.socket.pubsub.Dist.distribute location=AtExit
ts=2023-05-15 09:24:18; [cost=0.019258ms] result=@ArrayList[
    @Object[][
        @Msg[
            MORE=@Integer[1],
            COMMAND=@Integer[2],
            CREDENTIAL=@Integer[32],
            IDENTITY=@Integer[64],
            SHARED=@Integer[128],
            metadata=null,
            flags=@Integer[1],
            type=@Type[
                DATA=@Type[DATA],
                DELIMITER=@Type[DELIMITER],
                $VALUES=@Type[][isEmpty=false;size=2],
                name=@String[DATA],
                ordinal=@Integer[0],
            ],
            fileDesc=null,
            size=@Integer[12],
            data=@byte[][
                @Byte[98],
                @Byte[108],
                @Byte[111],
                @Byte[99],
                @Byte[107],
                @Byte[84],
                @Byte[114],
                @Byte[105],
                @Byte[103],
                @Byte[103],
                @Byte[101],
                @Byte[114],
            ],
            buf=@HeapByteBuffer[
                hb=@byte[][isEmpty=false;size=12],
                offset=@Integer[0],
                isReadOnly=@Boolean[false],
                bigEndian=@Boolean[true],
                nativeByteOrder=@Boolean[false],
                SPLITERATOR_CHARACTERISTICS=@Integer[16464],
                mark=@Integer[-1],
                position=@Integer[0],
                limit=@Integer[12],
                capacity=@Integer[12],
                address=@Long[0],
            ],
            writeIndex=@Integer[0],
            readIndex=@Integer[0],
        ],
    ],
    @Dist[
        pipes=@ArrayList[
            @Pipe[
                inpipe=@YPipe[zmq.pipe.YPipe@52f8dd9b],
                outpipe=@YPipe[zmq.pipe.YPipe@5013c41a],
                inActive=@Boolean[false],
                outActive=@Boolean[false],
                hwm=@Integer[1000],
                lwm=@Integer[500],
                msgsRead=@Long[1],
                msgsWritten=@Long[1000],
                peersMsgsRead=@Long[0],
                peer=@Pipe[zmq.pipe.Pipe@4ae245d5(SessionBase[2]->Pub[3])],
                sink=@Pub[Pub[1]],
                state=@State[ACTIVE],
                delay=@Boolean[false],
                identity=null,
                credential=null,
                conflate=@Boolean[false],
                parent=@Pub[Pub[1]],
                $assertionsDisabled=@Boolean[true],
                ctx=@Ctx[zmq.Ctx@65caf4e8],
                tid=@Integer[3],
            ],
        ],
        matching=@Integer[0],
        active=@Integer[0],
        eligible=@Integer[0],
        more=@Boolean[false],
    ],
    null,
]

Output is combined by {params,target,returnObj}. It seems that closed client is still in the array pipes of Dist, and the new client cannot be added to pipes. So new client cannot receive any message. Has any solution?

317787106 avatar May 24 '23 04:05 317787106

Can you use netstat to see if the source port and the dest port are the same port? I've meet the same problem.

DayJun avatar Jun 01 '23 08:06 DayJun