hivemq-mqtt-client icon indicating copy to clipboard operation
hivemq-mqtt-client copied to clipboard

MQTT 5 server without client takeover support and reconnect

Open missourian55 opened this issue 4 years ago • 3 comments

I am trying to simulate un-graceful disconnect and client takeover (in this case, the broker doesn't support it). Like to know how to re-establish the connection? I am not sure if this is a bug with HiveMQ client, but would like to disclose what I am testing and appreciate any recommendations.

To Reproduce

Steps

  1. Start the application
  2. Run it for few minutes. Publish a few messages to MQTT 5 broker
  3. Close the laptop for few minutes and let it go to sleep
  4. Open the laptop; application resumes and trying to post a new message to MQTT 5 broker.
  5. Getting below error
com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected.

Reproducer code


@ApplicationScoped
@JBossLog
public class PriceGenerator {

    private final Random random = new Random();

    @Inject
    Mqtt5BlockingClient mqtt5BlockingClient;

    @ConfigProperty(name = "mqtt.topic")
    String topic;

    @Scheduled(every = "45s")
    public void generateMessage() {
        
        // troubleshooting the problem
        if(!mqtt5BlockingClient.getState().isConnected()) {
            log.info("Mqtt in disconnect state. Re-establishing connection... ");
            try {
                destroy();
            } catch (Exception e) {
                e.printStackTrace(); //MqttClientStateException: MQTT client is not connected.
                log.error("disconnect failure");
            }
            try {
                mqtt5BlockingClient.connect();
            } catch (Exception e) {
                e.printStackTrace(); // MqttClientStateException: MQTT client is already connected or connecting.
                log.error("connect failure");
            }
        }

        mqtt5BlockingClient
            .publishWith()
            .topic(topic)
            .payload(String.valueOf(random.nextInt(100)).getBytes(UTF_8))
            .qos(AT_LEAST_ONCE)
            .messageExpiryInterval(120)
            .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
            .send();
    }

    @PostConstruct
    public void init() {
        log.info("Going to establish a subscription with a Mqtt broker...");
        mqtt5BlockingClient.toAsync()
            .subscribeWith()
            .topicFilter(topic)
            .qos(AT_LEAST_ONCE)
            .callback(publish -> {
                final Optional<ByteBuffer> correlationData = publish.getCorrelationData();
                log.infov("Received message: {0}  ->  {1}", publish.getTopic(), UTF_8.decode(publish.getPayload().get()));
            })
            .send();

    }

    @PreDestroy
    public void destroy() {
        mqtt5BlockingClient.disconnect();
        log.info("Disconnected the Mqtt client...");
    }

}

@Singleton
class AppConfig {

    @ConfigProperty(name = "mqtt.host")
    String host;

    @ConfigProperty(name = "mqtt.port")
    int port;

    @Singleton
    @Produces
    public Mqtt5BlockingClient mqtt5BlockingClient() {

        final Mqtt5BlockingClient client = MqttClient.builder()
            .useMqttVersion5()
            .serverHost(host)
            .serverPort(port)
            .identifier(UUID.randomUUID())
            .automaticReconnectWithDefaultConfig()
            .buildBlocking();

        client.connectWith()
            .simpleAuth()
            .username("test")
            .password(UTF_8.encode("test"))
            .applySimpleAuth()
            .sessionExpiryInterval(3600 * 24)
            .cleanStart(true) 
            .keepAlive(30)
            .send();

        return client;

    }

}

Details

  • Affected HiveMQ MQTT Client version(s): 1.2.2
  • Used JVM version: Java 11
  • Used OS (name and version): mac OS Intel
  • Used MQTT version: MQTT 5
  • Used MQTT broker (name and version): Solace 9.x

missourian55 avatar Aug 04 '21 02:08 missourian55

Hi @missourian55 If you use automaticReconnect, then there is no reason to call if (!isConnected) { disconnect; connect; } yourself. Currently, the client does not support disconnecting and reconnecting yourself when it will automatically reconnect (https://github.com/hivemq/hivemq-mqtt-client/issues/302). So, everything should work and if you remove the if (!isConnected) { disconnect; connect; } code, then you should not get any exceptions logged.

SgtSilvio avatar Aug 09 '21 06:08 SgtSilvio

@SgtSilvio Thank you! I added this if (!isConnected) { disconnect; connect; } after noticing that client is not automatically reconnecting. Does MQTT server have any special support for client automaticReconnect? In my case MQTT server is Solace

missourian55 avatar Aug 12 '21 01:08 missourian55

@missourian55 sorry for the delayed response. I don't know how Solace handles reconnects/client takeovers and so on. To see if the server rejects reconnect attempts, please add a ConnectedListener and DisconnectedListener which log all connect attempts and failures:

MqttClient.builder()
            ...
            .automaticReconnectWithDefaultConfig()
            .addConnectedListener(context -> log.info("connected"))
            .addDisconnectedListener(context -> log.info("disconnected by {}, cause {}", context.getSource(), context.getCause()))
            ...

SgtSilvio avatar Sep 03 '21 15:09 SgtSilvio

Hi @missourian55 - it's been while since this issue has been updated. I'm going to close it for now but if anything remains, feel free to re-open it again or file another issue. Thanks for using the HiveMQ client!

pglombardo avatar Mar 07 '23 14:03 pglombardo