hivemq-mqtt-client
hivemq-mqtt-client copied to clipboard
.sessionExpiryInterval(int) prevents graceful disconnect and shutdown
Expected behavior
Calling client.disconnect() should allow for gracefully program termination. The provided example creates an async client, connects and subscribes to a topic. Normal operation would expect the program to exit after the main() runs out.
Assuming the client requires a disconnect, a the tester thread was created to to create a delayed called to client.disconnect(). The tester thread waits briefly and then triggers a client.disconnect() which should interrupt the subscription and the program gracefully exit.
Actual behavior
The test does not exit gracefully and remains operating for the length of time set in the call to client.sessionExpiryInterval(int). If set to 20 seconds, the subscribe thread remains alive for 20 seconds. Our use case requires a session to live a few hours, making this an issue. We can call a brute force System.exit() but this is bad practice.
To Reproduce
Run test code and it will not gracefully exit after the tester thread calls client.disconnect(). It waits out the time set in sessionExpiryInterval(int). The assumption is that the client should cleanly terminate when disconnect() is called. The assumption is the sessionExpiryInterval should signal the broker to keep the session alive with respective seconds parameter, but it should not impact the lifetime/lifecycle of the client.
We would even assume that as soon as the client goes out of scope it disconnects, and no threads hold the program alive.
As this is our first exposure to the HiveMQ client, please let us know if there is another operation that we must perform that will both set the broker's session lifetime and yet provide a timely exit when disconnect() is called.
Workarounds
- Call System.exit() but that's bad practice.
- Set client.sessionExpiryInterval( 0 ) but this defeats signalling the broker
- Call cancel on the subscriber's CompletableFuture, i.e. subCF.cancel( true )
The last one is clearly a hack, as calling cancel() on a CF that has already signaled complete is shameful. This leaves calling System.exit(), but we like graceful shutdowns in our production software.
Comments and Observations
The active thread appears to be waiting on an nio operation with timeout. Should the disconnect() interrupt this thread?
But then why would the session expiry interval affect the lifetime of the local client? If it is meant to create a grace period during shutdown, a pause for a subscription to complete, then it should be a separate interval setting.
This is the async variant of the client, so connect and subscribe statements are not expected to keep the client alive, or impact the client lifecycle. Should a call to disconnect() even be required?
Reproducer code
public class HiveSubAsyncFail
{
public static void main( String[] args )
throws Exception
{
final Mqtt5AsyncClient client = Mqtt5Client.builder()
.identifier( "asyncsub" )
.serverHost( "localhost" )
// so we can observe the internal connect/disconnect events
.addConnectedListener( x -> System.out.println( ":connected" ) )
.addDisconnectedListener( x -> System.out.println( ":disconnected" ) )
.buildAsync();
client.connectWith()
// signal broker we want to accumulate messages in our absence
.cleanStart( false )
// set session expiry to 20 seconds
/// it does signal broker to keep our session for 20 seconds,
/// but PREVENTS DISCONNECT. It holds up graceful disconnect for 20 seconds.
/// Because we want to sever to keep session for an hour, so will it hold up the disconnect
.sessionExpiryInterval( 20 )
.send();
final @NotNull CompletableFuture<Mqtt5SubAck> subFutre = client.subscribeWith()
.topicFilter( "Examples" )
.callback( p -> System.out.println( new String( p.getPayloadAsBytes() ) ) )
.send();
subFutre.whenComplete( ( ack, th ) ->
{
System.out.println( "subscribe complete" + th == null ? "." : " with error " + th.getMessage() );
} );
// starting separate TESTER THREAD to let main thread lapse
new Thread( () ->
{
try
{
// pause 5 seconds to test successful subscription
Thread.sleep( 5000 );
// then signal disconnect
System.out.println( "signal disconnect" );
client.disconnect();
// wait a second for disconnect to take effect
Thread.sleep( 1000 );
System.out.println( "conn state " + client.getState() );
// A WORKAROUND HACK...
// Calling cancel on the original subscribe future kills
// the thread that prevents a graceful shutdown
// without this call, client won't shutdown until sessionExpiryInterval() expires
// subCF.cancel( true );
// we could also call system.exit, but that's just bad practice
// System.exit( 99 );
}
catch( final InterruptedException e )
{
e.printStackTrace();
}
} ).start();
// main thread should quietly finish and this test terminate after tester thread fires disconnect()
// the only thing keeping the test alive is poorly behaved threads that didn't get the disconnect signal
}
}
Details
- Affected HiveMQ MQTT Client version(s): 1.2.2
- Used JVM version: Java 11
- Used OS (name and version): debian 9.13
- Used MQTT version: 5
- Used MQTT broker (name and version): HiveMQ CE docker:latest / 2021.2