hivemq-mqtt-client
hivemq-mqtt-client copied to clipboard
MQTT 5 server without client takeover support and reconnect
I am trying to simulate un-graceful disconnect and client takeover (in this case, the broker doesn't support it). Like to know how to re-establish the connection? I am not sure if this is a bug with HiveMQ client, but would like to disclose what I am testing and appreciate any recommendations.
To Reproduce
Steps
- Start the application
- Run it for few minutes. Publish a few messages to MQTT 5 broker
- Close the laptop for few minutes and let it go to sleep
- Open the laptop; application resumes and trying to post a new message to MQTT 5 broker.
- Getting below error
com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected.
Reproducer code
@ApplicationScoped
@JBossLog
public class PriceGenerator {
private final Random random = new Random();
@Inject
Mqtt5BlockingClient mqtt5BlockingClient;
@ConfigProperty(name = "mqtt.topic")
String topic;
@Scheduled(every = "45s")
public void generateMessage() {
// troubleshooting the problem
if(!mqtt5BlockingClient.getState().isConnected()) {
log.info("Mqtt in disconnect state. Re-establishing connection... ");
try {
destroy();
} catch (Exception e) {
e.printStackTrace(); //MqttClientStateException: MQTT client is not connected.
log.error("disconnect failure");
}
try {
mqtt5BlockingClient.connect();
} catch (Exception e) {
e.printStackTrace(); // MqttClientStateException: MQTT client is already connected or connecting.
log.error("connect failure");
}
}
mqtt5BlockingClient
.publishWith()
.topic(topic)
.payload(String.valueOf(random.nextInt(100)).getBytes(UTF_8))
.qos(AT_LEAST_ONCE)
.messageExpiryInterval(120)
.payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
.send();
}
@PostConstruct
public void init() {
log.info("Going to establish a subscription with a Mqtt broker...");
mqtt5BlockingClient.toAsync()
.subscribeWith()
.topicFilter(topic)
.qos(AT_LEAST_ONCE)
.callback(publish -> {
final Optional<ByteBuffer> correlationData = publish.getCorrelationData();
log.infov("Received message: {0} -> {1}", publish.getTopic(), UTF_8.decode(publish.getPayload().get()));
})
.send();
}
@PreDestroy
public void destroy() {
mqtt5BlockingClient.disconnect();
log.info("Disconnected the Mqtt client...");
}
}
@Singleton
class AppConfig {
@ConfigProperty(name = "mqtt.host")
String host;
@ConfigProperty(name = "mqtt.port")
int port;
@Singleton
@Produces
public Mqtt5BlockingClient mqtt5BlockingClient() {
final Mqtt5BlockingClient client = MqttClient.builder()
.useMqttVersion5()
.serverHost(host)
.serverPort(port)
.identifier(UUID.randomUUID())
.automaticReconnectWithDefaultConfig()
.buildBlocking();
client.connectWith()
.simpleAuth()
.username("test")
.password(UTF_8.encode("test"))
.applySimpleAuth()
.sessionExpiryInterval(3600 * 24)
.cleanStart(true)
.keepAlive(30)
.send();
return client;
}
}
Details
- Affected HiveMQ MQTT Client version(s): 1.2.2
- Used JVM version: Java 11
- Used OS (name and version): mac OS Intel
- Used MQTT version: MQTT 5
- Used MQTT broker (name and version): Solace 9.x
Hi @missourian55
If you use automaticReconnect, then there is no reason to call if (!isConnected) { disconnect; connect; } yourself.
Currently, the client does not support disconnecting and reconnecting yourself when it will automatically reconnect (https://github.com/hivemq/hivemq-mqtt-client/issues/302).
So, everything should work and if you remove the if (!isConnected) { disconnect; connect; } code, then you should not get any exceptions logged.
@SgtSilvio Thank you! I added this if (!isConnected) { disconnect; connect; } after noticing that client is not automatically reconnecting. Does MQTT server have any special support for client automaticReconnect? In my case MQTT server is Solace
@missourian55 sorry for the delayed response. I don't know how Solace handles reconnects/client takeovers and so on. To see if the server rejects reconnect attempts, please add a ConnectedListener and DisconnectedListener which log all connect attempts and failures:
MqttClient.builder()
...
.automaticReconnectWithDefaultConfig()
.addConnectedListener(context -> log.info("connected"))
.addDisconnectedListener(context -> log.info("disconnected by {}, cause {}", context.getSource(), context.getCause()))
...
Hi @missourian55 - it's been while since this issue has been updated. I'm going to close it for now but if anything remains, feel free to re-open it again or file another issue. Thanks for using the HiveMQ client!