hivemq-mqtt-client
hivemq-mqtt-client copied to clipboard
[Persistent Sessions] Broker sends message before re-subscribing and client acknowledges those messages
Expected behavior
Subscribing should be possible before connecting the client - as described in https://github.com/hivemq/hivemq-mqtt-client/issues/521.
Actual behavior
Subscribing before connecting blocks forever.
To Reproduce
Steps
- We have a lot of QOS 2 message on different topics
- The client reconnects with
cleanSession = falsebecause we are not allowed to loose those messages - Accordingly,
sessionExpiryIntervalis set to 6000 - So the already existing subscriptions are completely valid right after the reconnect from the broker side
- Connecting before subscribing leads to the error message "No publish flow registered..." (MqttIncomingPublishService) for all messages send by the broker before subscribes are processed from the client side
- We loose the messages, because the following code snippet in
MqttIncomingPublishServiceacknowledges the message even if there is no publish flow registered:
@CallByThread("Netty EventLoop")
void drain() {
runIndex++;
blockingFlowCount = 0;
qos1Or2It.reset();
while (qos1Or2It.hasNext()) {
final MqttStatefulPublishWithFlows publishWithFlows = qos1Or2It.next();
emit(publishWithFlows);
if ((qos1Or2It.getIterated() == 1) && publishWithFlows.isEmpty() && publishWithFlows.areAcknowledged()) {
qos1Or2It.remove();
**incomingQosHandler.ack(publishWithFlows);**
} else if (blockingFlowCount == referencedFlowCount) {
return;
}
}
....
}
- Thats why we tried to ensure via the following Code that all subscriptions actually happen before the connect.
Reproducer code
Subscribing to messages before the connect does not work . The Single.timer is there to enforce the race condition we observed.
var connectBuilder = mqttClient.connectWith()
.sessionExpiryInterval(6000)
.cleanStart(false)
.keepAlive(3600);
Single.timer(20, TimeUnit.SECONDS)
.flatMap(it -> {
var subscribeMessage = Mqtt5Subscribe.builder()
.topicFilter("topic")
.qos(MqttQos.EXACTLY_ONCE)
.build();
return mqttClient
.subscribe(subscribeMessage)
.doOnSuccess(subscribeAck -> LOGGER.info("MQTT-Subscription ACK {}", subscribeAck));
})
.ignoreElement()
.subscribe();
connectBuilder.applyConnect();
Same behavior using subscribeSingleFuture (subscriptions is just a config POJO we use to configure the topics.):
var connectBuilder = builder.cleanStart(false);
var completableFutures = subscriptions.stream()
.map(subscription -> {
MqttQos hivemqQos = ITCS_QOS_TO_HIVE_MQ_QOS.get(subscription.getQos());
return mqttClient.subscribePublishesWith()
.topicFilter(subscription.getTopic().getTopicName())
.qos(hivemqQos)
.applySubscribe()
.subscribeSingleFuture(message -> handleMessage(subscription, message))
.exceptionally(throwable -> {
LOGGER.error(
"MQTT-Subscription nicht erfolgreich für Topic {}",
subscription.getTopic().getTopicName(),
throwable
);
throw new CompletionException(throwable);
})
.thenApply(mqtt5SubAck -> handleMqtt5SubAck(subscription, mqtt5SubAck));
})
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(completableFutures).join();
var connected = mqttClient.connectWith().applyConnect()
.subscribe(
success -> LOGGER.info(
"Connect Scenario Ergebnis: clientId = {}; reason = {}",
getClientId(),
success.getReasonString()
),
ex -> LOGGER.warn("Connect Scenario Error", ex)
);
Details
- Affected HiveMQ MQTT Client version(s): 1.2.1
- Used JVM version: 17.0.1
- Used OS (name and version): does not matter, Windows and Linux showing the same behavior
- Used MQTT version: 5
- Used MQTT broker (name and version): HiveMQ 4.7.2.