moquette icon indicating copy to clipboard operation
moquette copied to clipboard

republishing messages fails....

Open windbender opened this issue 7 years ago • 7 comments

I've got a situation I haven't fully figured out, wherein a client RE connecting with clean=false fails to receive the messages which were sent while disconnected because of an NPE.

the code here: ProtocolProcessor.java:355

    LOG.info("republishing stored messages to client <{}>", clientSession.clientID);
    for (IMessagesStore.StoredMessage pubEvt : publishedEvents) {
        //put in flight zone
        LOG.trace("Adding to inflight <{}>", pubEvt.getMessageID());
       clientSession.inFlightAckWaiting(pubEvt.getGuid(), pubEvt.getMessageID());

The publishedEvents has items which are null. This causes an NPE. I'm trying to understand the circumstances which could cause this to be true.

More context. I turned up the logging and added a complaint about the null entry:

CONNECT for client <123>
Found an existing connection with same client ID <123>, forcing to close
Existing connection with same client ID <123>, forced to close
Lost connection with client <123>
Connect with keepAlive 60 s
Connect create session <[id0x0c97712c, /12.247.242.146:45680 => /172.31.25.68:8883]>
Connected client ID <123> with clean session false
republishStoredInSession for client <ClientSession{clientID='123'}>
republishing stored messages to client <123>
there was a null entry in the published events     <<<<<<  here is the null entry.
CONNECT processed

windbender avatar Nov 17 '16 00:11 windbender

By placing various breakpoints in the moquette code I can get

    public void checkReceivePublishedMessage_after_a_reconnect_with_notCleanSession() throws 

to throw a null pointer exception.

java.lang.NullPointerException
    at org.mapdb.StoreWAL.update(StoreWAL.java:382)
    at org.mapdb.Caches$HashTable.update(Caches.java:270)
    at org.mapdb.HTreeMap.putInner(HTreeMap.java:533)
    at org.mapdb.HTreeMap.put(HTreeMap.java:488)
    at io.moquette.spi.persistence.MapDBSessionsStore.removeEnqueued(MapDBSessionsStore.java:229)

This leads me to believe there may be a race condition type situation, or at least a timing depending type behavior, which can cause problems.

The behavior changes from run to run.

windbender avatar Nov 17 '16 00:11 windbender

OK!

Place a break point on ClientSession.java:80 which is there return statement in the following method:

    public List<IMessagesStore.StoredMessage> storedMessages() {
        //read all messages from enqueued store
        Collection<MessageGUID> guids = this.m_sessionsStore.enqueued(clientID);
        return messagesStore.listMessagesInSession(guids);
    }

Run the above test in an IDE w/ debugger. Pause for 2-3 seconds at the breakpoint and then "run." The test will fail.

Interestingly, enough this is a different NPE that then one I listed above. it's actually the messagesCollector in the test failing to return a message.

windbender avatar Nov 17 '16 01:11 windbender

Hi Chris if you say "pause for 2-3 srconds" and than your client doesn't get notified seems like the timeout for missed activity on the channel gets triggered abd drop the connection,could check if is that case? Andrea Il giorno 17/nov/2016 02:20, "windbender" [email protected] ha scritto:

OK!

Place a break point on ClientSession.java:80 which is there return statement in the following method:

public List<IMessagesStore.StoredMessage> storedMessages() {
    //read all messages from enqueued store
    Collection<MessageGUID> guids = this.m_sessionsStore.enqueued(clientID);
    return messagesStore.listMessagesInSession(guids);
}

Run the above test in an IDE w/ debugger. Pause for 2-3 seconds at the breakpoint and then "run." The test will fail.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/andsel/moquette/issues/234#issuecomment-261126276, or mute the thread https://github.com/notifications/unsubscribe-auth/AA_Y0carTOvNnzRQ-63ePqddY69kHG-7ks5q-6v6gaJpZM4K0pcs .

andsel avatar Nov 17 '16 06:11 andsel

I can certainly look for this. How/Where is the missed activity timeout triggered ?

windbender avatar Nov 17 '16 16:11 windbender

I mean when the keepalive time is set, https://github.com/andsel/moquette/blob/master/broker/src/main/java/io/moquette/spi/impl/ProtocolProcessor.java#L281

andsel avatar Nov 17 '16 20:11 andsel

@andsel so the setIdleTime is set to 90 seconds in the test code. a 2-3 second pause is not long enough to trigger this. And, I confirmed this by placing breakpoints in the IdleStateHandler which did not hit. Any other ideas ?

It feels to me like some sort of concurrency problem.

Truth is, I'm not sure that the test failing, and my original NPE are related.

windbender avatar Nov 17 '16 21:11 windbender

Ok. so I turned on more logging:

13:48:04,809 [nioEventLoopGroup-3-3] INFO  ProtocolProcessor republishStoredInSession 354  - republishing stored messages to client <TestClient>
13:48:04,811 [nioEventLoopGroup-3-3] INFO  MapDBSessionsStore inFlight 208  - storing inflight clientID <TestClient> messageID 1 guid <MessageGUID{guid='f4acd351-3bd3-4572-ba0f-294b3902bbca'}>
13:48:04,820 [main] INFO  Server stopServer 207  - Server stopping...
13:48:04,835 [nioEventLoopGroup-3-3] INFO  ProtocolProcessor directSend 584  - send publish message to <TestClient> on topic </topic>
13:48:05,373 [main] INFO  NettyAcceptor close 290  - Msg read: 12, msg wrote: 8
13:48:05,374 [main] INFO  NettyAcceptor close 293  - Bytes read: 180, bytes wrote: 2014
13:48:06,057 [nioEventLoopGroup-3-3] INFO  ProtocolProcessor completeConnect 314  - CONNECT processed
13:48:06,058 [main] INFO  Server stopServer 218  - Server stopped

java.lang.NullPointerException
    at io.moquette.server.ServerIntegrationPahoTest.testPublishWithQoS1_notCleanSession(ServerIntegrationPahoTest.java:286)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

The test clearly showed the republished message being sent. So this NPE has something to do with the client not receiving that data.

13:48:05,373 [main] INFO  NettyAcceptor close 290  - Msg read: 12, msg wrote: 8
13:48:05,374 [main] INFO  NettyAcceptor close 293  - Bytes read: 180, bytes wrote: 2014

We could set that aside, but go back to the original problem. at the top since these are two different NPEs

windbender avatar Nov 17 '16 21:11 windbender