vertx-mqtt icon indicating copy to clipboard operation
vertx-mqtt copied to clipboard

Vertical Worker MQTT Server cannot reject a connection

Open qeychon opened this issue 6 years ago • 0 comments

Hi all,

I use the vertx-mqtt Version: 3.6.2 I have a vertical worker which contains the MQTT-Server:

public class StrangeMQTTServer extends AbstractVerticle {

    private MqttServer mqttServer;

    @Override
    public void start(Future<Void> startFuture) throws Exception {
        this.mqttServer = MqttServer.create(vertx);
        this.mqttServer.endpointHandler(endpoint -> {
            // endpoint.accept() <- works with deploy worker configuration    endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
        });
        this.mqttServer.listen(event -> {
            if (event.succeeded()) Logger.info("MQTT Server is listening on port {} ...", this.mqttServer.actualPort());
            else if (event.failed()) Logger.error(event.cause(), "MQTT Server failure ...");
            startFuture.complete();
        });
    }

    @Override
    public void stop(Future<Void> stopFuture) throws Exception {
        this.mqttServer.close(event -> stopFuture.complete());
    }
}

Now I want to connect to the server with a simple MQTT-Client:

    @Before
    public void setUp(TestContext context) throws Exception {
        Async async = context.async();
        this.strangeMQTTServer = new StrangeMQTTServer();
        DeploymentOptions isWorker = new DeploymentOptions()
                .setWorker(true)
                .setMaxWorkerExecuteTime(2000)
                .setMaxWorkerExecuteTimeUnit(TimeUnit.MILLISECONDS);
        // Does not work -> vertx.deployVerticle(this.strangeMQTTServer, isWorker, event1 -> {
        vertx.deployVerticle(this.strangeMQTTServer, event1 -> {
            if (event1.succeeded()) {
                async.complete();
            } else {
                context.fail(event1.cause());
            }
        });

    }

    @After
    public void tearDown(TestContext context) throws Exception {
        Async async = context.async();
        vertx.undeploy(this.strangeMQTTServer.deploymentID(), event1 -> {
            if (event1.succeeded()) {
                async.complete();
            } else {
                context.fail(event1.cause());
            }
        });
    }

    @Test
    public void connackNotOk(TestContext context) {
        Async async = context.async();
        client.connect(MqttClientOptions.DEFAULT_PORT, MqttClientOptions.DEFAULT_HOST, c -> {
            context.assertTrue(c.failed());
            context.assertTrue(c.cause() instanceof MqttConnectionException);
            MqttConnectionException connEx = (MqttConnectionException) c.cause();
            context.assertEquals(connEx.code(), MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
            context.assertFalse(client.isConnected());
            async.complete();
        });
    }

If the class is not a worker, then everything works as expected, but if it is a worker, then no handler will be triggered. Which I also do not understand, why can the worker accept the connection but not reject a connection?

qeychon avatar Jan 13 '19 09:01 qeychon