MQTT.js icon indicating copy to clipboard operation
MQTT.js copied to clipboard

[Bug]: publishing qos1 message on handleMessage causes reconnection / timeout

Open leppaott opened this issue 3 months ago • 2 comments

MQTTjs Version

5.14.0

Broker

vernemq

Environment

NodeJS

Description

Looks the same as very old: https://github.com/mqttjs/MQTT.js/issues/838

Seems that if you publish a message on handleMessage() topic handler, specifically qos1 message (not qos0 could be due to: https://github.com/mqttjs/MQTT.js/blob/ab2ed39e306c5be18e2ac73c05f758f677a1a0c6/src/lib/client.ts#L1018), the publish-callback will not return "immediately" but we get null error parameter and reconnection.

While if we don't await the publish() we get undefined err immediately and no reconnection. Also noticed if you call the "DoneCallback" on the handleMessage before your own callback it works with awaiting publish() too but don't think it's supposed to be used like that. But hard to say since there is no async-examples available for the handleMessage() either.

Looks like it's the keepalive that in the end timeouts the handler. This explains the https://github.com/mqttjs/MQTT.js/issues/838 when keepalive likely wasn't implemente - it just hang awaiting for publish() to return.

 onKeepaliveTimeout :: calling _cleanUp with force true
 _cleanUp :: forced? true
 _cleanUp :: (my-client-id) :: destroying stream
 _cleanUp :: client not disconnecting/reconnecting. Clearing and resetting reconnect.
 _clearReconnect : clearing reconnect timer
 _setupReconnect :: emit `offline` state
 _setupReconnect :: set `reconnecting` to `true`
 _setupReconnect :: setting reconnectTimer for 1000 ms
 _destroyKeepaliveManager :: destroying keepalive manager

What I'd expect is that publishing qos1 message works on the handler works as expected and returns to the topic-handler which can continue its work.

To wrapup looks like qos1 ack handling isn't processed during handleMessage?

Minimal Reproduction

  this.mqttClient.handleMessage = async ({ topic, payload }, done): Promise<void> => {
      await callback(topic, payload)
      done();
  };

async callback() { 
    await new Promise((resolve, reject) => {
      this.mqttClient.publish(anotherTopic, message, opts, (err) => {
        if (err) {
          reject(err);
        } else {
          resolve();
        }
      });
    });

Debug logs

writable stream :: parsing buffer parser :: on packet push to packets array. work :: getting next packet in queue work :: packet pulled from queue _handlePacket :: emitting packetreceive handlePublish: packet Packet { cmd: 'publish', retain: false, qos: 0, dup: false, length: 388, topic: 'my-request-topic', payload: <Buffer ... 309 more bytes> } handlePublish: qos 0 publish :: message {"data":{}} to topic my-response-topic publish :: qos 1 MqttClient:publish: packet cmd: publish _sendPacket :: (my-client-id) :: start storeAndSend :: store packet with cmd publish to outgoingStore _removeTopicAliasAndRecoverTopicName :: alias NaN, topic 'my-response-topic' noop :: undefined _writePacket :: packet: { cmd: 'publish', topic: 'my-response-topic', payload: <Buffer 7b ... 156 more bytes>, qos: 1, retain: false, messageId: 22910, dup: false, properties: undefined } _writePacket :: emitting packetsend _writePacket :: writing to stream _writePacket :: writeToStream result true _writePacket :: invoking cb noop :: undefined _sendPacket :: (my-client-id) :: end _sendPing :: sending pingreq _sendPacket :: (my-client-id) :: start _writePacket :: packet: { cmd: 'pingreq' } _writePacket :: emitting packetsend _writePacket :: writing to stream _writePacket :: writeToStream result true _writePacket :: invoking cb noop :: undefined onKeepaliveTimeout :: calling _cleanUp with force true _cleanUp :: forced? true _cleanUp :: (my-client-id) :: destroying stream _cleanUp :: client not disconnecting/reconnecting. Clearing and resetting reconnect. _clearReconnect : clearing reconnect timer _setupReconnect :: emit offline state _setupReconnect :: set reconnecting to true _setupReconnect :: setting reconnectTimer for 1000 ms _destroyKeepaliveManager :: destroying keepalive manager (my-client-id)stream :: on close _flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function stream: emit close to MqttClient close :: connected set to false close :: clearing connackTimer close :: calling _setupReconnect _setupReconnect :: doing nothing... reconnectTimer :: reconnect triggered! _reconnect: emitting reconnect to client _reconnect: calling connect connect :: calling method to clear reconnect _clearReconnect : clearing reconnect timer connect :: using streamBuilder provided to client to create stream connect :: pipe stream to writable stream connect: sending packet connect _writePacket :: packet: { cmd: 'connect', protocolId: 'MQTT', protocolVersion: 5, clean: true, clientId: 'my-client-id', keepalive: 60, username: '...', password: '...', properties: { sessionExpiryInterval: 3600 } } _writePacket :: emitting packetsend _writePacket :: writing to stream _writePacket :: writeToStream result true writable stream :: parsing buffer parser :: on packet push to packets array. work :: getting next packet in queue work :: packet pulled from queue _handlePacket :: emitting packetreceive _handleConnack _setupKeepaliveManager :: keepalive 60 (seconds) KeepaliveManager: set keepalive to 60000ms _sendPacket :: (my-client-id) :: start _writePacket :: packet: { cmd: 'publish', topic: 'my-response-topic', payload: <Buffer 7b ... more bytes>, qos: 1, retain: false, messageId: 22910, dup: false, properties: undefined } _writePacket :: emitting packetsend _writePacket :: writing to stream _writePacket :: writeToStream result true _writePacket :: invoking cb noop :: undefined writable stream :: parsing buffer parser :: on packet push to packets array. work :: getting next packet in queue work :: packet pulled from queue _handlePacket :: emitting packetreceive _reschedulePing :: rescheduling ping _handleAck :: packet type puback RETURNING null connect :: sending queued packets deliver :: entry undefined _resubscribe _resubscribe: protocolVersion 5 subscribe: ....

leppaott avatar Sep 17 '25 10:09 leppaott

@leppaott Would you like to submit a PR to fix this?

robertsLando avatar Sep 17 '25 12:09 robertsLando

@robertsLando there's some attempt at fixing - and it fixes the test added for qos1 case (not qos2) but breaks one order-after-reconnection test and there might some issues with it.

leppaott avatar Sep 18 '25 12:09 leppaott