vertx-mqtt
vertx-mqtt copied to clipboard
Vertical Worker MQTT Server cannot reject a connection
Hi all,
I use the vertx-mqtt Version: 3.6.2 I have a vertical worker which contains the MQTT-Server:
public class StrangeMQTTServer extends AbstractVerticle {
private MqttServer mqttServer;
@Override
public void start(Future<Void> startFuture) throws Exception {
this.mqttServer = MqttServer.create(vertx);
this.mqttServer.endpointHandler(endpoint -> {
// endpoint.accept() <- works with deploy worker configuration endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
});
this.mqttServer.listen(event -> {
if (event.succeeded()) Logger.info("MQTT Server is listening on port {} ...", this.mqttServer.actualPort());
else if (event.failed()) Logger.error(event.cause(), "MQTT Server failure ...");
startFuture.complete();
});
}
@Override
public void stop(Future<Void> stopFuture) throws Exception {
this.mqttServer.close(event -> stopFuture.complete());
}
}
Now I want to connect to the server with a simple MQTT-Client:
@Before
public void setUp(TestContext context) throws Exception {
Async async = context.async();
this.strangeMQTTServer = new StrangeMQTTServer();
DeploymentOptions isWorker = new DeploymentOptions()
.setWorker(true)
.setMaxWorkerExecuteTime(2000)
.setMaxWorkerExecuteTimeUnit(TimeUnit.MILLISECONDS);
// Does not work -> vertx.deployVerticle(this.strangeMQTTServer, isWorker, event1 -> {
vertx.deployVerticle(this.strangeMQTTServer, event1 -> {
if (event1.succeeded()) {
async.complete();
} else {
context.fail(event1.cause());
}
});
}
@After
public void tearDown(TestContext context) throws Exception {
Async async = context.async();
vertx.undeploy(this.strangeMQTTServer.deploymentID(), event1 -> {
if (event1.succeeded()) {
async.complete();
} else {
context.fail(event1.cause());
}
});
}
@Test
public void connackNotOk(TestContext context) {
Async async = context.async();
client.connect(MqttClientOptions.DEFAULT_PORT, MqttClientOptions.DEFAULT_HOST, c -> {
context.assertTrue(c.failed());
context.assertTrue(c.cause() instanceof MqttConnectionException);
MqttConnectionException connEx = (MqttConnectionException) c.cause();
context.assertEquals(connEx.code(), MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
context.assertFalse(client.isConnected());
async.complete();
});
}
If the class is not a worker, then everything works as expected, but if it is a worker, then no handler will be triggered. Which I also do not understand, why can the worker accept the connection but not reject a connection?