hivemq-mqtt-client
hivemq-mqtt-client copied to clipboard
Java Hive Client does not receive all the messages on session reconnect
Expected behavior
Hive mqtt client should receive all the messages(that were published to the broker while the consumer was offline) on reconnection with clean_session=false with no session expiry. The client is build in java using hive mqtt client version 1.3.0. The broker is hivemq version 4.7.6
Actual behavior
On reconnection, the client does not receive messages published by all the publishers. for example: If 4 different publishers sent 12 messages(3 messages each), on reconnection only 9 messages will be received. 3 messages sent by one of the publishers is not received.
To Reproduce
Start hivemq broker using the below docker command: docker run -p 8080:8080 -p 1883:1883 hivemq/hivemq4 Start the hive client java application. Client subscribes on qos2. Publishers are publishing on qos1(also tested with qos2).
Steps
- Start hivemq client application.
- Publish messages from 4 different publishers.
- All messages are received and logged.
- Now turn off the client application.
- Publish messages from only 1 publisher.
- Now turn on the client application again. No messages are received.
- Turn off the client application again.
- Publish messages from all 4 publishers.
- Now turn on the client application again. All the messages are received and logged except messages from one of the publishers.
Reproducer code
Client code: //Create the client Mqtt5AsyncClient subClient = Mqtt5Client.builder().identifier("mqttConsumer") .serverHost("localhost") .serverPort(1883) .automaticReconnect( MqttClientAutoReconnect.builder() .initialDelay(3000, TimeUnit.MILLISECONDS) .maxDelay(10000, TimeUnit.MILLISECONDS).build()) .buildAsync();
//Connect the client subClient.connectWith() .cleanStart(false) .sessionExpiryInterval(3652460*60l) .send() .get(10000, TimeUnit.MILLISECONDS);
//Subscribe to the topic subClient.toAsync() .subscribeWith() .topicFilter("topic1") .qos(MqttQos.EXACTLY_ONCE) .callback(mqtt5Publish -> doSomething(mqtt5Publish)) .send();
//print the received message on callback private static void doSomething(Mqtt5Publish mqtt5Publish) { System.out.println(StandardCharsets.UTF_8.decode(mqtt5Publish.getPayload().get()).toString()); }
Details
- Affected HiveMQ MQTT Client version(s): 1.3.0
- Used JVM version: 1.8.0_221
- Used OS (name and version): Windows 10
- Used MQTT version: version 5
- Used MQTT broker (name and version): hivemq 4.7.6
I face the same issue on 1.3.0 I downgraded to 1.2.2, and it works as expected.
I downgraded to 1.2.2, and it works as expected
@rohit5ram Thanks, I tried 1.2.2 and it works fine.
Hi @rohit5ram @manojrawat650 - thanks for pointing this out and apologies for the late response. I'm trying to replicate what you reported here. I'll report back soon once I have something.
After some digging, I was able to reproduce this with both 1.3.0 and 1.2.2. The actual cause is because the broker is delivering the QoS 1/2 messages before the subscribe call is executed which results in messages being dropped on the floor.
To fix this, you can preconfigure the subClient to handle messages before sending the connect (and subscribe).
subClient.toAsync().publishes(MqttGlobalPublishFilter.SUBSCRIBED, mqtt5Publish -> {
System.out.println("received message: " + mqtt5Publish);
});
Take a look at the result:

So with QoS 1 & 2, some special handling has to be done to guarantee that messages are correctly consumed.
This could be documented better which I'll look to do sometime soon.
Could you let me know if this is still an issue or if this helps at all?
So with QoS 1 & 2, some special handling has to be done to guarantee that messages are correctly consumed.
Special handling is actually not required. Instead just call subscribe before connect:
client.toAsync().subscribeWith()
.subscribeWith()
.topicFilter("topic1")
.callback(publish -> doSomething(publish))
.send();
// do not block on the future here
client.toBlocking().connectWith()
.cleanStart(false)
.sessionExpiryInterval(3652460*60l)
.send();
// here you could block on the future returned by subscribe
Considering the age of this issue and two solutions are now provided, I'll close out this issue.
I will look to possibly add a "Best Practices" section to the documentation soon that will cover the resubscribe topic.
If any issues remain, feel free to open another issue anytime or contact us in our community forum.