hivemq-mqtt-client
hivemq-mqtt-client copied to clipboard
Callback is not triggered after async client re-connection.
Expected behavior
After performing re-connect and re-subscription to MQTT broker callback is fired each time a new message arrives to the topic the client is subscribed to.
Actual behavior
Whenever connection to MQTT broker is lost and then re-stablished, subscription to the same topic is made (using same client instance), callback is not triggered until asyncClient.publishes() is executed once again for it.
To Reproduce
Steps
- Create async client and connect to the broker
- Register global consumer for
MqttGlobalPublishFilter.SUBSCRIBED - Subscribe
- Disconnect the client from step 1 (either via broker tools or connection interruption)
- Perform reconnection with the client from step 1
- Subscribe to same topic as in step 3
- Publish something on the topic to which the client is subscribed to.
Reproducer code
Consider that an MQTT Broker is running on the same machine, locally. For reproducing the problem I've used HiveMQ3 running in Docker. Whenever the code below had been connected and subscribed to the broker I then disconnected it via HiveMQ's web ui to trigger re-connection & re-subscription logic of the test.
@Test
public void reconnectSameCallback() throws ExecutionException, InterruptedException {
final @NotNull Mqtt3BlockingClient blockingClient
= MqttClient.builder()
.useMqttVersion3()
.identifier("test-clientId")
.addDisconnectedListener((MqttClientDisconnectedContext context) -> {
log.warn("Disconnected! ", context.getCause());
})
.serverHost("127.0.0.1")
.serverPort(1883)
.buildBlocking();
Mqtt3AsyncClient asyncClient = blockingClient.toAsync();
final Mqtt3Subscribe subscription =
Mqtt3Subscribe.builder()
.addSubscription()
.topicFilter("test/1")
.qos(MqttQos.AT_LEAST_ONCE)
.applySubscription()
.build();
Consumer<Mqtt3Publish> subCallback = (Mqtt3Publish publish) -> {
System.out.println("Sub COMMON callback: " + publish.getTopic() + " " + publish.getPayload());
};
asyncClient.connect().get(); // waiting
// Single callback registration as suggested here:
// https://github.com/hivemq/hivemq-mqtt-client/issues/454
asyncClient.publishes(MqttGlobalPublishFilter.SUBSCRIBED, subCallback);
asyncClient.subscribe(subscription);
//simulating running app with re-connection executed by some custom code
while (true) {
boolean connected = asyncClient.getConfig().getConnectionConfig().isPresent();
if (!connected) {
log.warn("Reconnecting...");
try {
asyncClient.connect().get();// waiting
log.info("Re-Connected!");
asyncClient.subscribe(subscription);
log.info("Re-Sub!");
//asyncClient.publishes(MqttGlobalPublishFilter.SUBSCRIBED, subCallback); // this will fix the problem
} catch (ExecutionException e) {
log.warn(e.getMessage());
}
}
}
}
Details
- Affected HiveMQ MQTT Client version(s): 1.2.2
- Used JVM version: openjdk version "11.0.6" 2020-01-14
- Used OS (name and version): MacOS 10.15.6
- Used MQTT version: 3
- Used MQTT broker (name and version): HiveMQ 3.4.6 (local docker container).
I am a bit confused with the behaviour of publishes(), I don't expect that I have to execute it each time i establish a connection, in my understanding whenever I register a consumer for mqtt client instance it should be working unless I destroy the instance of the client.