hivemq-mqtt-client icon indicating copy to clipboard operation
hivemq-mqtt-client copied to clipboard

[Persistent Sessions] Broker sends message before re-subscribing and client acknowledges those messages

Open Blafasel3 opened this issue 3 years ago • 0 comments

Expected behavior

Subscribing should be possible before connecting the client - as described in https://github.com/hivemq/hivemq-mqtt-client/issues/521.

Actual behavior

Subscribing before connecting blocks forever.

To Reproduce

Steps

  1. We have a lot of QOS 2 message on different topics
  2. The client reconnects with cleanSession = false because we are not allowed to loose those messages
  3. Accordingly, sessionExpiryInterval is set to 6000
  4. So the already existing subscriptions are completely valid right after the reconnect from the broker side
  5. Connecting before subscribing leads to the error message "No publish flow registered..." (MqttIncomingPublishService) for all messages send by the broker before subscribes are processed from the client side
  6. We loose the messages, because the following code snippet in MqttIncomingPublishService acknowledges the message even if there is no publish flow registered:
@CallByThread("Netty EventLoop")
void drain() {
    runIndex++;
    blockingFlowCount = 0;
    qos1Or2It.reset();
    while (qos1Or2It.hasNext()) {
        final MqttStatefulPublishWithFlows publishWithFlows = qos1Or2It.next();
        emit(publishWithFlows);
        if ((qos1Or2It.getIterated() == 1) && publishWithFlows.isEmpty() && publishWithFlows.areAcknowledged()) {
            qos1Or2It.remove();
            **incomingQosHandler.ack(publishWithFlows);**
        } else if (blockingFlowCount == referencedFlowCount) {
            return;
        }
    }
    ....
}
  1. Thats why we tried to ensure via the following Code that all subscriptions actually happen before the connect.

Reproducer code

Subscribing to messages before the connect does not work . The Single.timer is there to enforce the race condition we observed.

var connectBuilder = mqttClient.connectWith()
						 .sessionExpiryInterval(6000)
						 .cleanStart(false)
						 .keepAlive(3600);
Single.timer(20, TimeUnit.SECONDS)
	.flatMap(it -> {
		var subscribeMessage = Mqtt5Subscribe.builder()
								   .topicFilter("topic")
								   .qos(MqttQos.EXACTLY_ONCE)
								   .build();
		return mqttClient
				   .subscribe(subscribeMessage)
				   .doOnSuccess(subscribeAck -> LOGGER.info("MQTT-Subscription ACK {}", subscribeAck));
	})
	.ignoreElement()
	.subscribe();
connectBuilder.applyConnect();

Same behavior using subscribeSingleFuture (subscriptions is just a config POJO we use to configure the topics.):

var connectBuilder = builder.cleanStart(false);		
var completableFutures = subscriptions.stream()
									 .map(subscription -> {
										 MqttQos hivemqQos = ITCS_QOS_TO_HIVE_MQ_QOS.get(subscription.getQos());
										 return mqttClient.subscribePublishesWith()
													.topicFilter(subscription.getTopic().getTopicName())
													.qos(hivemqQos)
													.applySubscribe()
													.subscribeSingleFuture(message -> handleMessage(subscription, message))
													.exceptionally(throwable -> {
														LOGGER.error(
															"MQTT-Subscription nicht erfolgreich für Topic {}",
															subscription.getTopic().getTopicName(),
															throwable
														);
														throw new CompletionException(throwable);
													})
													.thenApply(mqtt5SubAck -> handleMqtt5SubAck(subscription, mqtt5SubAck));
									 })
									 .toArray(CompletableFuture[]::new);
		CompletableFuture.allOf(completableFutures).join();
		var connected = mqttClient.connectWith().applyConnect()
							.subscribe(
								success -> LOGGER.info(
									"Connect Scenario Ergebnis: clientId = {}; reason = {}",
									getClientId(),
									success.getReasonString()
								),
								ex -> LOGGER.warn("Connect Scenario Error", ex)
							);

Details

  • Affected HiveMQ MQTT Client version(s): 1.2.1
  • Used JVM version: 17.0.1
  • Used OS (name and version): does not matter, Windows and Linux showing the same behavior
  • Used MQTT version: 5
  • Used MQTT broker (name and version): HiveMQ 4.7.2.

Blafasel3 avatar Aug 01 '22 12:08 Blafasel3