hivemq-mqtt-client icon indicating copy to clipboard operation
hivemq-mqtt-client copied to clipboard

Callback is not triggered after async client re-connection.

Open frankyfish opened this issue 4 years ago • 0 comments

Expected behavior

After performing re-connect and re-subscription to MQTT broker callback is fired each time a new message arrives to the topic the client is subscribed to.

Actual behavior

Whenever connection to MQTT broker is lost and then re-stablished, subscription to the same topic is made (using same client instance), callback is not triggered until asyncClient.publishes() is executed once again for it.

To Reproduce

Steps

  1. Create async client and connect to the broker
  2. Register global consumer for MqttGlobalPublishFilter.SUBSCRIBED
  3. Subscribe
  4. Disconnect the client from step 1 (either via broker tools or connection interruption)
  5. Perform reconnection with the client from step 1
  6. Subscribe to same topic as in step 3
  7. Publish something on the topic to which the client is subscribed to.

Reproducer code

Consider that an MQTT Broker is running on the same machine, locally. For reproducing the problem I've used HiveMQ3 running in Docker. Whenever the code below had been connected and subscribed to the broker I then disconnected it via HiveMQ's web ui to trigger re-connection & re-subscription logic of the test.

    @Test
    public void reconnectSameCallback() throws ExecutionException, InterruptedException {
        final @NotNull Mqtt3BlockingClient blockingClient
            = MqttClient.builder()
                        .useMqttVersion3()
                        .identifier("test-clientId")
                        .addDisconnectedListener((MqttClientDisconnectedContext context) -> {
                            log.warn("Disconnected! ", context.getCause());
                        })
                        .serverHost("127.0.0.1")
                        .serverPort(1883)
                        .buildBlocking();
        Mqtt3AsyncClient asyncClient = blockingClient.toAsync();

        final Mqtt3Subscribe subscription =
            Mqtt3Subscribe.builder()
                          .addSubscription()
                          .topicFilter("test/1")
                          .qos(MqttQos.AT_LEAST_ONCE)
                          .applySubscription()
                          .build();
        Consumer<Mqtt3Publish> subCallback = (Mqtt3Publish publish) -> {
            System.out.println("Sub COMMON callback: " + publish.getTopic() + " " + publish.getPayload());
        };
        asyncClient.connect().get(); // waiting

        // Single callback registration as suggested here:
        //  https://github.com/hivemq/hivemq-mqtt-client/issues/454
        asyncClient.publishes(MqttGlobalPublishFilter.SUBSCRIBED, subCallback);
        asyncClient.subscribe(subscription);
        
        //simulating running app with re-connection executed by some custom code
        while (true) {
            boolean connected = asyncClient.getConfig().getConnectionConfig().isPresent();
            if (!connected) {
                log.warn("Reconnecting...");
                try {
                    asyncClient.connect().get();// waiting
                    log.info("Re-Connected!");
                    asyncClient.subscribe(subscription);
                    log.info("Re-Sub!");
                    //asyncClient.publishes(MqttGlobalPublishFilter.SUBSCRIBED, subCallback); // this will fix the problem
                } catch (ExecutionException e) {
                    log.warn(e.getMessage());
                }
            }
        }
    }

Details

  • Affected HiveMQ MQTT Client version(s): 1.2.2
  • Used JVM version: openjdk version "11.0.6" 2020-01-14
  • Used OS (name and version): MacOS 10.15.6
  • Used MQTT version: 3
  • Used MQTT broker (name and version): HiveMQ 3.4.6 (local docker container).

I am a bit confused with the behaviour of publishes(), I don't expect that I have to execute it each time i establish a connection, in my understanding whenever I register a consumer for mqtt client instance it should be working unless I destroy the instance of the client.

frankyfish avatar Aug 13 '21 09:08 frankyfish