hivemq-mqtt-client icon indicating copy to clipboard operation
hivemq-mqtt-client copied to clipboard

Manual Acknowledgment not working after reconnect

Open PMacho opened this issue 4 years ago • 12 comments

Expected behavior

After a client is reconnected by means of context.getReconnector().reconnect(true), an already established subscription should continue working as it did before reconnecting the client.

Actual behavior

After reconnecting a client, the subscribePublishes() flow, is no longer sending PubAcks when Mqtt3Publish.acknowledge() is invoked and returns without an Exception.

To Reproduce

Steps

The effect can be seen in the example and the TCP dump below.

In the logs we see that message 1 is successfully received and acknowledged, which is consistent with the TCP dump. Further, message 2 at first causes and error and thus lets the client reconnect. After reconnection, message 2 is redelivered, and according to the logs acknowledged. However, in the TCP dump we see that there actually is no Publish Ack (id=101), which is expected. Hence, no further messages are being processed and the test runs indefinitely.

Reproducer code

@Slf4j
@ExtendWith(LoggingExtension.class)
@ExtendWith(StaticHiveMQTestContainerExtension.class)
public class HiveMQClientTests {

    private MqttTestPublisher mqttTestPublisher;
    private MqttTestSubscriber mqttTestSubscriber;

    private static int testBrokerPort;

    @BeforeAll
    static void init() {
        testBrokerPort = Integer.parseInt(System.getProperty("test.broker.port"));
    }

    @BeforeEach
    void setup() {
        mqttTestPublisher = new MqttTestPublisher(mqtt3ReactorClient("test-publisher"));
        mqttTestSubscriber = new MqttTestSubscriber(mqtt3ReactorClient("test-subscriber"));
    }

    @Test
    void reconnectTest() throws InterruptedException {

        final String topic = "test-topic";
        final MqttQos qos = MqttQos.AT_LEAST_ONCE;

        Queue<String> receivedMessages = new ConcurrentLinkedQueue<>();
        CountDownLatch countDownLatch = new CountDownLatch(4);

        mqttTestSubscriber
                .mqttSubscribe(
                        topic,
                        qos,
                        payload -> {
                            final boolean alreadySeen = receivedMessages.contains(payload);
                            receivedMessages.offer(payload);

                            log.info("received message: " + payload + "; already seen: " + alreadySeen);

                            if (!alreadySeen && Integer.parseInt(payload) % 2 == 0) {
                                return Mono.error(new Exception("Exception in handler ... "));
                            } else {
                                return Mono.just(payload);
                            }
                        }
                )
                .subscribe(next -> countDownLatch.countDown());

        Flux
                .range(1, 2)
                .delayElements(Duration.ofMillis(100))
                .flatMap(integer -> mqttTestPublisher.publishMono(topic, Mono.just(integer), qos))
                .blockLast();

        Mono
                .delay(Duration.ofSeconds(2))
                .flatMapMany(l -> mqttTestPublisher.publishMono(topic, Mono.just(3), qos))
                .blockLast();

        countDownLatch.await();
    }

    static class MqttTestSubscriber {
        final Mqtt3ReactorClient client;

        MqttTestSubscriber(Mqtt3ReactorClient client) {
            this.client = client;
        }

        public Flux<MqttMessage<String>> mqttSubscribe(final String topic, final MqttQos qos, Function<String, Mono<String>> messageHandler) {
            EmitterProcessor<Mqtt3Publish> processor = EmitterProcessor.create(false);
            client
                    .subscribePublishes(
                            Mqtt3Subscribe
                                    .builder()
                                    .addSubscription(Mqtt3Subscription.builder().topicFilter(topic).qos(qos).build())
                                    .build(),
                            true
                    )
                    .doOnSingle(subAck -> log.info("MqttTestSubscriber received subAck: " + subAck.toString() + " for topic " + topic))
                    .subscribe(processor);

            return processor
                    .asFlux()
                    .map(mqtt3Publish -> new MqttMessage<>(
                            String.valueOf(StandardCharsets.UTF_8.decode(mqtt3Publish
                                    .getPayload()
                                    .orElseThrow(() -> new NoSuchElementException("Received empty MQTT message: " + mqtt3Publish.toString()))
                            )),
                            mqtt3Publish
                    ))
                    .flatMap(message -> messageHandler.apply(message.payload).thenReturn(message))
                    .doOnNext(message -> {
                        log.info("Acknowledging message: " + message.toString());
                        message.mqtt3Publish.acknowledge();
                    })
                    .retryWhen(Retry.withThrowable(throwableFlux -> throwableFlux.doOnNext(throwable -> {
                        log.error("Received error in mqtt subscribe flow: ", throwable);
                        try {
                            client.disconnect().block();
                        } catch (MqttClientStateException mqttClientStateException) {
                            log.info("Caught exception disconnecting: " + mqttClientStateException.getLocalizedMessage());
                        }
                    })));
        }
    }

    @Value
    static class MqttMessage<T> {
        T payload;
        Mqtt3Publish mqtt3Publish;
    }

    static class MqttTestPublisher {
        final Mqtt3ReactorClient client;

        MqttTestPublisher(Mqtt3ReactorClient client) {
            this.client = client;
        }

        public <T, S extends Mono<T>> Flux<Mqtt3PublishResult> publishMono(String topic, S payload, MqttQos qos) {
            return payload
                    .map(message -> Mqtt3Publish
                            .builder()
                            .topic(topic)
                            .qos(qos)
                            .payload(message.toString().getBytes(StandardCharsets.UTF_8))
                            .build()
                    )
                    .as(client::publish);
        }
    }

    private Mqtt3ReactorClient mqtt3ReactorClient(final String clientId) {
        final Mqtt3ReactorClient client = Mqtt3ReactorClient.from(Mqtt3Client
                        .builder()
                        .identifier(clientId)
                        .serverHost("localhost")
                        .serverPort(testBrokerPort)
                        .addConnectedListener(context -> log.info("Connecting client " + clientId))
                        .addDisconnectedListener(context -> {
                            log.info("Disconnected client " + clientId + " due to: " + context.getCause());
                            context.getReconnector().resubscribeIfSessionExpired(true);
                            context.getReconnector().reconnect(true).delay(1, TimeUnit.SECONDS);
                        })
                        .buildRx()
        );

        connect(client, clientId);
        return client;
    }

    private void connect(Mqtt3ReactorClient client, String clientId) {
        client
                .connect(Mqtt3Connect.builder().cleanSession(false).build())
                .doOnNext(mqtt3ConnAck -> log.info("Successfully connected client: " + clientId))
                .block();
    }
}
@Slf4j
public class StaticHiveMQTestContainerExtension extends HiveMQTestContainerCore<HiveMQTestContainerExtension> implements BeforeAllCallback, AfterAllCallback {

	public StaticHiveMQTestContainerExtension() {
//		super("hivemq/hivemq-ce", "2020.6");
		super("hivemq/hivemq4", "latest");
		withControlCenter(9000);
		withLogLevel(Level.TRACE);
	}

	@Override
	public void beforeAll(ExtensionContext context) throws Exception {
		start();

		String ipAddress = this.getContainerIpAddress();
		int port = this.getMqttPort();
		log.info("Found ip address " + ipAddress + " and port " + port + " for HiveMQ TestContainer instance.");
		System.setProperty("test.broker.host", ipAddress);
		System.setProperty("test.broker.port", String.valueOf(port));
	}

	@Override
	public void afterAll(ExtensionContext context) throws Exception {
		stop();
	}
}
@Slf4j
public class LoggingExtension implements BeforeEachCallback, AfterEachCallback {

	@Override
	public void beforeEach(ExtensionContext context) throws Exception {
		log.info("");
		log.info("-------------------------------------------------------------------------");
		log.info("Start running test " + context.getDisplayName());
		log.info("-------------------------------------------------------------------------");
	}

	@Override
	public void afterEach(ExtensionContext context) throws Exception {
		log.info("-------------------------------------------------------------------------");
		log.info("Done running test " + context.getDisplayName());
		log.info("-------------------------------------------------------------------------");
		log.info("");
	}
}

TCP dump:

410935 614442.792532 127.0.0.1 127.0.0.1 TCP 68 65399 → 32886 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=608334076 TSecr=0 SACK_PERM=1 410936 614442.792621 127.0.0.1 127.0.0.1 TCP 68 32886 → 65399 [SYN, ACK] Seq=0 Ack=1 Win=65535 Len=0 MSS=16344 WS=64 TSval=608334076 TSecr=608334076 SACK_PERM=1 410937 614442.792629 127.0.0.1 127.0.0.1 TCP 56 65399 → 32886 [ACK] Seq=1 Ack=1 Win=408256 Len=0 TSval=608334076 TSecr=608334076 410938 614442.792637 127.0.0.1 127.0.0.1 TCP 56 [TCP Window Update] 32886 → 65399 [ACK] Seq=1 Ack=1 Win=408256 Len=0 TSval=608334076 TSecr=608334076 410947 614442.806948 127.0.0.1 127.0.0.1 MQTT 84 Connect Command 410948 614442.806970 127.0.0.1 127.0.0.1 TCP 56 32886 → 65399 [ACK] Seq=1 Ack=29 Win=408256 Len=0 TSval=608334089 TSecr=608334089 410989 614443.045500 127.0.0.1 127.0.0.1 MQTT 60 Connect Ack 410990 614443.045530 127.0.0.1 127.0.0.1 TCP 56 65399 → 32886 [ACK] Seq=29 Ack=5 Win=408256 Len=0 TSval=608334319 TSecr=608334319 411009 614443.074912 127.0.0.1 127.0.0.1 TCP 68 65400 → 32886 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=608334345 TSecr=0 SACK_PERM=1 411010 614443.075010 127.0.0.1 127.0.0.1 TCP 68 32886 → 65400 [SYN, ACK] Seq=0 Ack=1 Win=65535 Len=0 MSS=16344 WS=64 TSval=608334345 TSecr=608334345 SACK_PERM=1 411011 614443.075019 127.0.0.1 127.0.0.1 TCP 56 65400 → 32886 [ACK] Seq=1 Ack=1 Win=408256 Len=0 TSval=608334345 TSecr=608334345 411012 614443.075027 127.0.0.1 127.0.0.1 TCP 56 [TCP Window Update] 32886 → 65400 [ACK] Seq=1 Ack=1 Win=408256 Len=0 TSval=608334345 TSecr=608334345 411013 614443.075566 127.0.0.1 127.0.0.1 MQTT 85 Connect Command 411014 614443.075584 127.0.0.1 127.0.0.1 TCP 56 32886 → 65400 [ACK] Seq=1 Ack=30 Win=408256 Len=0 TSval=608334345 TSecr=608334345 411029 614443.117621 127.0.0.1 127.0.0.1 MQTT 60 Connect Ack 411031 614443.117649 127.0.0.1 127.0.0.1 TCP 56 65400 → 32886 [ACK] Seq=30 Ack=5 Win=408256 Len=0 TSval=608334383 TSecr=608334383 411053 614443.241960 127.0.0.1 127.0.0.1 MQTT 73 Subscribe Request (id=65526) [test-topic] 411054 614443.241988 127.0.0.1 127.0.0.1 TCP 56 32886 → 65400 [ACK] Seq=5 Ack=47 Win=408192 Len=0 TSval=608334505 TSecr=608334505 411065 614443.271579 127.0.0.1 127.0.0.1 MQTT 61 Subscribe Ack (id=65526) 411066 614443.271595 127.0.0.1 127.0.0.1 TCP 56 65400 → 32886 [ACK] Seq=47 Ack=10 Win=408256 Len=0 TSval=608334531 TSecr=608334531 411077 614443.379853 127.0.0.1 127.0.0.1 MQTT 73 Publish Message (id=1) [test-topic] 411078 614443.379873 127.0.0.1 127.0.0.1 TCP 56 32886 → 65399 [ACK] Seq=5 Ack=46 Win=408192 Len=0 TSval=608334636 TSecr=608334636 411095 614443.426276 127.0.0.1 127.0.0.1 MQTT 60 Publish Ack (id=1) 411096 614443.426299 127.0.0.1 127.0.0.1 TCP 56 65399 → 32886 [ACK] Seq=46 Ack=9 Win=408256 Len=0 TSval=608334678 TSecr=608334678 411101 614443.436185 127.0.0.1 127.0.0.1 MQTT 73 Publish Message (id=51) [test-topic] 411102 614443.436208 127.0.0.1 127.0.0.1 TCP 56 65400 → 32886 [ACK] Seq=47 Ack=27 Win=408256 Len=0 TSval=608334688 TSecr=608334688 411111 614443.441897 127.0.0.1 127.0.0.1 MQTT 60 Publish Ack (id=51) 411112 614443.441912 127.0.0.1 127.0.0.1 TCP 56 32886 → 65400 [ACK] Seq=27 Ack=51 Win=408192 Len=0 TSval=608334692 TSecr=608334692 411129 614443.475829 127.0.0.1 127.0.0.1 MQTT 73 Publish Message (id=1) [test-topic] 411130 614443.475852 127.0.0.1 127.0.0.1 TCP 56 32886 → 65399 [ACK] Seq=9 Ack=63 Win=408192 Len=0 TSval=608334723 TSecr=608334723 411139 614443.499924 127.0.0.1 127.0.0.1 MQTT 60 Publish Ack (id=1) 411140 614443.499948 127.0.0.1 127.0.0.1 TCP 56 65399 → 32886 [ACK] Seq=63 Ack=13 Win=408256 Len=0 TSval=608334745 TSecr=608334745 411151 614443.504268 127.0.0.1 127.0.0.1 MQTT 73 Publish Message (id=101) [test-topic] 411152 614443.504284 127.0.0.1 127.0.0.1 TCP 56 65400 → 32886 [ACK] Seq=51 Ack=44 Win=408256 Len=0 TSval=608334750 TSecr=608334750 411173 614443.525691 127.0.0.1 127.0.0.1 MQTT 58 Disconnect Req 411174 614443.525729 127.0.0.1 127.0.0.1 TCP 56 32886 → 65400 [ACK] Seq=44 Ack=53 Win=408192 Len=0 TSval=608334770 TSecr=608334770 411175 614443.526461 127.0.0.1 127.0.0.1 TCP 56 65400 → 32886 [FIN, ACK] Seq=53 Ack=44 Win=408256 Len=0 TSval=608334770 TSecr=608334770 411176 614443.526492 127.0.0.1 127.0.0.1 TCP 56 32886 → 65400 [ACK] Seq=44 Ack=54 Win=408192 Len=0 TSval=608334770 TSecr=608334770 411181 614443.531082 127.0.0.1 127.0.0.1 TCP 56 32886 → 65400 [FIN, ACK] Seq=44 Ack=54 Win=408192 Len=0 TSval=608334775 TSecr=608334770 411182 614443.531121 127.0.0.1 127.0.0.1 TCP 56 65400 → 32886 [ACK] Seq=54 Ack=45 Win=408256 Len=0 TSval=608334775 TSecr=608334775 411211 614444.540155 127.0.0.1 127.0.0.1 TCP 68 65401 → 32886 [SYN] Seq=0 Win=65535 Len=0 MSS=16344 WS=64 TSval=608335772 TSecr=0 SACK_PERM=1 411212 614444.540258 127.0.0.1 127.0.0.1 TCP 68 32886 → 65401 [SYN, ACK] Seq=0 Ack=1 Win=65535 Len=0 MSS=16344 WS=64 TSval=608335772 TSecr=608335772 SACK_PERM=1 411213 614444.540272 127.0.0.1 127.0.0.1 TCP 56 65401 → 32886 [ACK] Seq=1 Ack=1 Win=408256 Len=0 TSval=608335772 TSecr=608335772 411214 614444.540282 127.0.0.1 127.0.0.1 TCP 56 [TCP Window Update] 32886 → 65401 [ACK] Seq=1 Ack=1 Win=408256 Len=0 TSval=608335772 TSecr=608335772 411215 614444.540515 127.0.0.1 127.0.0.1 MQTT 85 Connect Command 411216 614444.540535 127.0.0.1 127.0.0.1 TCP 56 32886 → 65401 [ACK] Seq=1 Ack=30 Win=408256 Len=0 TSval=608335773 TSecr=608335773 411229 614444.572533 127.0.0.1 127.0.0.1 MQTT 60 Connect Ack 411230 614444.572547 127.0.0.1 127.0.0.1 TCP 56 65401 → 32886 [ACK] Seq=30 Ack=5 Win=408256 Len=0 TSval=608335803 TSecr=608335803 411245 614444.581092 127.0.0.1 127.0.0.1 MQTT 73 Publish Message (id=101) [test-topic] 411246 614444.581107 127.0.0.1 127.0.0.1 TCP 56 65401 → 32886 [ACK] Seq=30 Ack=22 Win=408256 Len=0 TSval=608335810 TSecr=608335810 411275 614445.503719 127.0.0.1 127.0.0.1 MQTT 73 Publish Message (id=1) [test-topic] 411276 614445.503746 127.0.0.1 127.0.0.1 TCP 56 32886 → 65399 [ACK] Seq=13 Ack=80 Win=408192 Len=0 TSval=608336718 TSecr=608336718 411285 614445.518508 127.0.0.1 127.0.0.1 MQTT 60 Publish Ack (id=1) 411286 614445.518526 127.0.0.1 127.0.0.1 TCP 56 65399 → 32886 [ACK] Seq=80 Ack=17 Win=408256 Len=0 TSval=608336732 TSecr=608336732 411390 614448.523789 127.0.0.1 127.0.0.1 TCP 56 65401 → 32886 [FIN, ACK] Seq=30 Ack=22 Win=408256 Len=0 TSval=608339694 TSecr=608335810 411391 614448.523802 127.0.0.1 127.0.0.1 TCP 56 65399 → 32886 [FIN, ACK] Seq=80 Ack=17 Win=408256 Len=0 TSval=608339694 TSecr=608336732 411392 614448.523826 127.0.0.1 127.0.0.1 TCP 56 32886 → 65401 [ACK] Seq=22 Ack=31 Win=408256 Len=0 TSval=608339694 TSecr=608339694 411393 614448.523834 127.0.0.1 127.0.0.1 TCP 56 32886 → 65399 [ACK] Seq=17 Ack=81 Win=408192 Len=0 TSval=608339694 TSecr=608339694 411400 614448.528167 127.0.0.1 127.0.0.1 TCP 56 32886 → 65401 [FIN, ACK] Seq=22 Ack=31 Win=408256 Len=0 TSval=608339697 TSecr=608339694 411401 614448.528197 127.0.0.1 127.0.0.1 TCP 56 65401 → 32886 [ACK] Seq=31 Ack=23 Win=408256 Len=0 TSval=608339697 TSecr=608339697 411402 614448.528948 127.0.0.1 127.0.0.1 TCP 56 32886 → 65399 [FIN, ACK] Seq=17 Ack=81 Win=408192 Len=0 TSval=608339698 TSecr=608339694 411403 614448.528979 127.0.0.1 127.0.0.1 TCP 56 65399 → 32886 [ACK] Seq=81 Ack=18 Win=408256 Len=0 TSval=608339698 TSecr=608339698

Details

  • Affected HiveMQ MQTT Client version(s): 1.2.1
  • Used JVM version: 1.8.0_232
  • Used OS (name and version): MacOS Version 11.2.3
  • Used MQTT version: 3
  • Used MQTT broker (name and version): HiveMQ 4, Latest

PMacho avatar Apr 07 '21 20:04 PMacho

Hi @PMacho Thank you for the very detailed information. If I read your code correctly, then you never acknowledge the first message with payload "2" because you return a Mono.error and then the doOnNext with publish.acknowledge is never called. It is required to acknowledge every publish when you set manualAcknowledgement to true. It is a bug in the application if it does not ensure to call acknowledge on every message even when processing of the message fails.

SgtSilvio avatar Apr 08 '21 10:04 SgtSilvio

To give a little bit more explanation: If you would use QoS 2, then the client filters out the duplicate message and you have to acknowledge the message you got in the first place. Reconnecting the client and waiting for a duplicate message, is not a good solution to force reprocessing.

I hope this makes sense to you. Feel free to comment below.

SgtSilvio avatar Apr 08 '21 10:04 SgtSilvio

@SgtSilvio It is correct, that the first message is never acknowledged. This is not a bug but intentionally, since there was an error in the processing. Thus, acknowledging the message would mean losing that messages. We use QOS 1 to ensure, we do not lose a single message.

I guess, the requirement to acknowledge every publish is a client implementation detail? I mean, it cannot be forced by the broker, since it may happen, that the client application crashes before it even can acknowledge the publish. The requirement, would then create a dead end situation, where no publish is possible anymore to this client. Plus, if we are forced to acknowledge every message, no matter if we are able to process it ore not, MQTT QOS 1 does not add any value over TCP.

Reconnecting the client and waiting for a duplicate message, is not a good solution to force reprocessing.

What do you mean by that? As far as I know, this is the usual way of an MQTT client to function: In case of an error, throw an Exception, do not acknowledge, reconnect (after a delay), wait for the retransmit. What would be the alternative?

I probably could omit the requirement, by rebuilding the client on each error. This however still does reconnect. Since we use the same client for many subscriptions, this causes me a whole lot of administration to get a clean reconnect scenario. Actually we went away from another client to this one, to remove this kind of administration environment around the client.

Is there a possibility to "clean" the client on reconnect? So it would not require me to acknowledge a message, that could not be processed?

PMacho avatar Apr 08 '21 12:04 PMacho

Sorry for jumping in but I've been working on other clients (Golang) and I'm trying to figure some things out myself.

In regards to:

I guess, the requirement to acknowledge every publish is a client implementation detail?

I think what the MQTT (v5 at least) specs say is that any topic is an Ordered topic and that acknowledgments have to be sent in the order in which the PUBLISH packets were received by the client.

So if the client receives 1,2,3,4 it should send the acks in the same order 1,2,3,4. You can't just ack 3 and disconnect. Unless the broker does not care about receiving the acks in order (against MQTT specs) and that seems to be the case with VerneMQ and EMQX, at least for now till they decide to abide by the rules. If that were to be the case though, then upon reconnection the broker could deal with the gap and send you 1,2,4. That does not seem to be what the specs describe though.

On the client side then, if 3 is acked, no packets should really be sent to the broker but simple buffered in memory till also 1 and 2 are acked. The on disconnection, the internal buffer with the "pending acks" should be cleared and the client should receive all packets that were not acknowledged (thus the ones pending too that were cleared). I hope it makes sense. Again sorry for jumping in, I have no experience whatsoever with this client but I thought it'd be good to share some thoughts :slightly_smiling_face:

Is there a possibility to "clean" the client on reconnect? So it would not require me to acknowledge a message, that could not be processed?

If my assumptions are correct, even after the "cleaning", once you reconnect you should get that message again. If you couldn't process it due to some infrastructure related issues (e.g. target database down) then it's good because you can try again and the messages are also in the order you would expect them to be. If you can't process the message due to a malformed payload just log it (perhaps DLQ?) and ack I guess, which also decreases pressure on the broker.

4.6 Message ordering

The following these rules apply to the Client when implementing the protocol flows defined in section 4.3.

When the Client re-sends any PUBLISH packets, it MUST re-send them in the order in which the original PUBLISH packets were sent (this applies to QoS 1 and QoS 2 messages) [MQTT-4.6.0-1]

  • The Client MUST send PUBACK packets in the order in which the corresponding PUBLISH packets were received (QoS 1 messages) [MQTT-4.6.0-2]
  • The Client MUST send PUBREC packets in the order in which the corresponding PUBLISH packets were received (QoS 2 messages) [MQTT-4.6.0-3]
  • The Client MUST send PUBREL packets in the order in which the corresponding PUBREC packets were received (QoS 2 messages) [MQTT-4.6.0-4]

An Ordered Topic is a Topic where the Client can be certain that the Application Messages in that Topic from the same Client and at the same QoS are received are in the order they were published. When a Server processes a message that has been published to an Ordered Topic, it MUST send PUBLISH packets to consumers (for the same Topic and QoS) in the order that they were received from any given Client [MQTT-4.6.0-5]. This is addition to the rules listed above.

By default, a Server MUST treat every Topic as an Ordered Topic when it is forwarding messages on Non‑shared Subscriptions. [MQTT-4.6.0-6]. A Server MAY provide an administrative or other mechanism to allow one or more Topics to not be treated as an Ordered Topic.

Non-normative comment

The rules listed above ensure that when a stream of messages is published and subscribed to an Ordered Topic with QoS 1, the final copy of each message received by the subscribers will be in the order that they were published. If the message is re-sent the duplicate message can be received after one of the earlier messages is received. For example, a publisher might send messages in the order 1,2,3,4 but the subscriber might receive them in the order 1,2,3,2,3,4 if there is a network disconnection after message 3 has been sent.

If both Client and Server set Receive Maximum to 1, they make sure that no more than one message is “in-flight” at any one time. In this case no QoS 1 message will be received after any later one even on re-connection. For example a subscriber might receive them in the order 1,2,3,3,4 but not 1,2,3,2,3,4. Refer to section 4.9 Flow Control for details of how the Receive Maximum is used.

https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901240

fracasula avatar Apr 22 '21 09:04 fracasula

@fracasula

On the client side then, if 3 is acked, no packets should really be sent to the broker but simple buffered in memory till also 1 and 2 are acked. The on disconnection, the internal buffer with the "pending acks" should be cleared and the client should receive all packets that were not acknowledged (thus the ones pending too that were cleared).

This is, what I had in mind when I asked for the possibility to manually "clean" the client. Since, in the actual implementation of this client, it seems like there is no such cleaning. In case of an Exception at message processing, the client indefinitely waits for that message to be acknowledged, even after reconnecting. That means, if any message can't be processed, there is no recovery scenario but rebuilding the client and all its connections and subscriptions. A simple client.clean() would be easier. A default "clean on disconnect" (invoked only when manually disconnecting the client) would be even more elegant, unless there are other implications, which I didn't think of now. To my understanding, such a function in no means contradicts with the spec.

If my assumptions are correct, even after the "cleaning", once you reconnect you should get that message again. If you couldn't process it due to some infrastructure related issues (e.g. target database down) then it's good because you can try again and the messages are also in the order you would expect them to be. If you can't process the message due to a malformed payload just log it (perhaps DLQ?) and ack I guess, which also decreases pressure on the broker.

Yeah, we have a wrapper around our MQTT subscriptions, that does exactly what you describe.

PMacho avatar Apr 22 '21 13:04 PMacho

Sorry for the delay, I will still answer here with additional explanations in the next few days, but unfortunately did not find the time in the last weeks.

SgtSilvio avatar May 21 '21 16:05 SgtSilvio

Hi @PMacho

Here is still a lot of confusion in distinguishing a processing error, a connection error, and an application crash. Let me first define the terms so that the answers below make sense:

  • An application crash means, that the application goes down including the (in-memory) MQTT client state.
  • A connection error means, that the connection between MQTT broker and MQTT client is intermittently broken and is reestablished via a reconnect. The difference to an application error is that the application is still running and the application does not need to worry about the connection error - the application might only get message duplicates after a reconnect when using QoS 1.
  • A processing error is an error on the application side, that happens after a message is delivered to the application. A processing error could be a malformed payload so that it can not be parsed or an error from an external system (for example a database) that is called during message processing. The application must decide whether the processing error is recoverable. If yes, then the application might perform some sort of retries. If not, the application might trigger an alert and simply ignore the broken message and move on.

Reconnecting the client and waiting for a duplicate message, is not a good solution to force reprocessing.

What do you mean by that? As far as I know, this is the usual way of an MQTT client to function: In case of an error, throw an Exception, do not acknowledge, reconnect (after a delay), wait for the retransmit. What would be the alternative?

The answer to a processing error should not be to force a connection error. If the application can determine that a processing error occurred that can be recovered, it can also take a different action than disconnecting and reconnecting the MQTT client, for example retrying the processing. There is simply no need to destroy the MQTT client to "retry" the whole delivery from broker to client to application. What you describe would actually be similar to an application crash, ignoring the fact that the application is still running, but behave as if it would have crashed and restarted.

For example, if an application receives a message M1 and it wants to write it to a database which is currently not available, the application can retry the connection to the database and only after it was successful acknowledge the message M1. As long as the processing is unfinished/retried, you simply defer the acknowledgement. If additionally a connection error happens during the retry of M1 to the database, the MQTT client will reconnect in the background and a duplicate message (QoS 1) M1* might be delivered to the application as if it would be a new message. As the application is still running, message M1 is still continued to be processed normally, as is then M1*. What MQTT provides with its QoS levels is to make an application not worry about connection errors - as if connection errors would not exist. If the application actually crashes during the retry to the database, then the application can be restarted and gets the message redelivered because it was not acknowledged.

So in short, no, in case of a processing error, it is not the "usual way" to reconnect and wait for a retransmit. In case of an application error it is.

Also reconnecting and resending has some disadvantages regarding efficiency if "only" the processing failed and can be retried in isolation.

It is correct, that the first message is never acknowledged. This is not a bug but intentionally, since there was an error in the processing. Thus, acknowledging the message would mean losing that messages.

You are actually mixing processing error and connection error here. If you retry the processing before acknowledging the message, you do not lose any messages as described above.

I guess, the requirement to acknowledge every publish is a client implementation detail? I mean, it cannot be forced by the broker, since it may happen, that the client application crashes before it even can acknowledge the publish.

You are mixing calling acknowledge in a running application and an application crash here.

Plus, if we are forced to acknowledge every message, no matter if we are able to process it ore not, MQTT QOS 1 does not add any value over TCP.

You are not forced to acknowledge a message if processing fails. You can simply retry the processing before you acknowledge as long as you like.

I hope what I wrote above is educational.

SgtSilvio avatar Jun 13 '21 23:06 SgtSilvio

One addition about QoS 2

Example:

  1. A QoS 2 message M1 is delivered to the client, the message is forwarded to the application but not acknowledged yet
  2. If a connection error occurs, a reconnect happens
  3. A duplicate message M1* is delivered to the client after the reconnect, but the message is recognized as a duplicate and so discarded and not forwarded to the application.

If now in point 1 processing of the message fails and we react to it with disconnecting and reconnecting the client (same as point 2) without retrying the processing of that message, we would never process that message as in point 3, the message is discarded as it was already forwarded to the application - we just simply "dropped" it ourselves.

SgtSilvio avatar Jun 13 '21 23:06 SgtSilvio

If you really want to proceed with what you initially described, there is actually a way to "solve" it.

  1. Processing error => disconnect client
  2. After the disconnect is complete, call acknowledge() on the message. This will not lead to a PUBACK being sent to the broker because the old connection is closed and it will also not be sent on the new connection after the reconnect, because this is not allowed by the MQTT specification.
  3. The message is redelivered as a new message which is again only acknowledged when you call acknowledge() on the new message.

Of course this only works with QoS 1.

SgtSilvio avatar Jun 13 '21 23:06 SgtSilvio

Hi @fracasula No problem for jumping in here.

I think what the MQTT (v5 at least) specs say is that any topic is an Ordered topic and that acknowledgments have to be sent in the order in which the PUBLISH packets were received by the client.

Just to remove potential confusion; this is only partially relevant for this topic. Manual acknowledgement on the application side via calling acknowledge() in this client library is decoupled from MQTT acknowledgements. So the user is not forced to call acknowledge in the order that the MQTT specification defines, the client library automatically ensures that the order of the acknowledgements complies with the MQTT specification. The messages 1,2,3,4 can be acknowledged for example in the order 3,1,4,2, but PUBACKs are sent in the order 1,2,3,4 to the broker.

SgtSilvio avatar Jun 14 '21 00:06 SgtSilvio

Just to remove potential confusion; this is only partially relevant for this topic. Manual acknowledgement on the application side via calling acknowledge() in this client library is decoupled from MQTT acknowledgements. So the user is not forced to call acknowledge in the order that the MQTT specification defines, the client library automatically ensures that the order of the acknowledgements complies with the MQTT specification. The messages 1,2,3,4 can be acknowledged for example in the order 3,1,4,2, but PUBACKs are sent in the order 1,2,3,4 to the broker.

@SgtSilvio that's what I thought yeah, thanks for confirming! :+1:

fracasula avatar Jun 15 '21 10:06 fracasula

I hope what I wrote above is educational.

Actually no, sorry. I am well aware of the different kinds of errors. However, what you suggest, is a form of tight coupling between the transport layer and the application layer, in a sense, where the transport layer dictates the application layer, how to act on errors. Thus, abstraction of transport details can not safely be done.

Processing error => disconnect client After the disconnect is complete, call acknowledge() on the message. This will not lead to a PUBACK being sent to the broker because the old connection is closed and it will also not be sent on the new connection after the reconnect, because this is not allowed by the MQTT specification. The message is redelivered as a new message which is again only acknowledged when you call acknowledge() on the new message. Of course this only works with QoS 1.

Thanks for the idea. This approach however sounds quite risky and a little hacky to me. No offence.

PMacho avatar Jul 23 '21 10:07 PMacho

Hi All - thanks for the the informative discussion. Since this issue has aged with no updates, I'm going to close it for now. If anything remains, please feel free to reopen or file another issue. We'd be happy to help out!

pglombardo avatar Mar 07 '23 14:03 pglombardo