vertx-mqtt icon indicating copy to clipboard operation
vertx-mqtt copied to clipboard

subscriber cant receive topic message

Open chenziyi2018 opened this issue 4 years ago • 7 comments

Questions

  • subscriber cant receive topic message

Version

4.0.0

Context

  • client code

` @Slf4j public class MqttClientVerticle extends AbstractVerticle {

private MqttClient mqttClient;

\@Override
public void start() throws Exception {
    log.info("good we are in now");
    MqttClientOptions options = new MqttClientOptions()
            .setKeepAliveInterval(5)
            .setClientId("client_1111")
            .setAutoKeepAlive(true);
    mqttClient = MqttClient.create(vertx, options);
    mqttClient.publishHandler(handle -> {
        log.info("receive topic:[{}],payload:[{}]", handle.topicName(), handle.payload());
    });

    mqttClient.connect(1883, "localhost", handler -> {
        mqttClient.subscribe("/topic", 0);
    });
}
@Override
public void stop() throws Exception {
    if (mqttClient != null) {
        log.info("disconnect");
        mqttClient.disconnect();
    }
}

}`

  • server code

` @Slf4j public class MqtttServer extends AbstractVerticle { private static final String TOPIC_LIGHTS = "lights";

@Override
public void start() throws Exception {
    MqttServerOptions options = new MqttServerOptions().setPort(1883).setHost("0.0.0.0");
    MqttServer mqttServer = MqttServer.create(vertx, options);
    mqttServer.endpointHandler(endpoint -> {
        System.out.println("connected client " + endpoint.clientIdentifier());
        endpoint.accept(false);
        endpoint.pingHandler(h -> {
            log.info("receive client [{}] ping message", endpoint.clientIdentifier());
        });
        handleSubscription(endpoint);
        handleUnsubscription(endpoint);
        publishHandler(endpoint);
        handleClientDisconnect(endpoint);
    })
            .listen(ar -> {
                if (ar.succeeded()) {
                    log.info("MQTT server is listening on port " + ar.result().actualPort());
                } else {
                    log.info("Error on starting the server");
                    ar.cause().printStackTrace();
                }
            });
}


private static void handleSubscription(MqttEndpoint endpoint) {
    endpoint.subscribeHandler(subscribe -> {

        List<MqttQoS> grantedQosLevels = new ArrayList<>();
        for (MqttTopicSubscription s : subscribe.topicSubscriptions()) {
            grantedQosLevels.add(s.qualityOfService());
        }
        String topicNames = subscribe.topicSubscriptions().stream().map(MqttTopicSubscription::topicName).collect(Collectors.joining(","));
        log.info("{} subscribe :{}", endpoint.clientIdentifier(), topicNames);
        endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);
        for (MqttTopicSubscription topicSubscription : subscribe.topicSubscriptions()) {
            endpoint.publish(topicSubscription.topicName(), Buffer.buffer("hello"),MqttQoS.EXACTLY_ONCE,false,false);
        }
        endpoint.publishAcknowledgeHandler(messageId -> {

            System.out.println("Received ack for message = " + messageId);

        }).publishReceivedHandler(endpoint::publishRelease).publishCompletionHandler(messageId -> {

            System.out.println("Received ack for message = " + messageId);
        });
    });
}

private static void handleUnsubscription(MqttEndpoint endpoint) {
    endpoint.unsubscribeHandler(unsubscribe -> {
        String topics = String.join(",", unsubscribe.topics());
        log.info("{} unsubscribe :{}", endpoint.clientIdentifier(), topics);
        endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
    });
}

private static void publishHandler(MqttEndpoint endpoint) {
    endpoint.publishHandler(message -> {
        log.info("client [{}] publish message :{}, to topic:[{}]", endpoint.clientIdentifier(), message.payload(), message.topicName());
        if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
            endpoint.publishAcknowledge(message.messageId());
        } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
            endpoint.publishRelease(message.messageId());
        }
    }).publishReleaseHandler(endpoint::publishComplete);
}

private static void handleQoS(MqttPublishMessage message, MqttEndpoint endpoint) {
    if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
        String topicName = message.topicName();

        if (TOPIC_LIGHTS.equals(topicName)) {
            LightsController.handler(message);
        }
        endpoint.publishAcknowledge(message.messageId());

    } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
        endpoint.publishRelease(message.messageId());
    }
}

private static void handleClientDisconnect(MqttEndpoint endpoint) {
    endpoint.disconnectHandler(h -> {
        log.info("{} disconnect", endpoint.clientIdentifier());
    });
}

}`

  • controller code

` @Slf4j public class VertxServer extends AbstractVerticle {

@Override
public void start() throws Exception {
    HttpServer server = vertx.createHttpServer();
    Router router = Router.router(vertx);
    router.route("/static/*").handler(StaticHandler.create("templates"));
    router.get("/hello").handler(ctx -> {

        MqttClientOptions options = new MqttClientOptions().setClientId("client_sender_111");
        MqttClient mqttClient = MqttClient.create(vertx, options);
        mqttClient.connect(1883, "localhost", h -> {
            if (h.succeeded()) {
                mqttClient.publish("/topic", Buffer.buffer("hello, how are you"), MqttQoS.AT_MOST_ONCE, false, false, s -> {
                    log.info("send success? {}", s.succeeded());
                    mqttClient.disconnect();
                });
            }
        });

        // This handler will be called for every request
        HttpServerResponse response = ctx.response();
        response.putHeader("content-type", "text/plain");
        // Write to the response and end it
        response.end("send success");
    });

    server.requestHandler(router).listen(9091, h -> {
        log.info("server start:{}", h.succeeded());
    });
}

}`

description

when the client init on project start, I can receive the msg publish by the mqtt server subscribeHandler "hello", but when I use the web router "/hello" to publish msg to the topic "/topic" the subscriber client "client_1111" cant recieve the msg ...

help me please ~ T_T ~

chenziyi2018 avatar Jan 16 '21 07:01 chenziyi2018

sorry its not bug, I used a wrong tag

chenziyi2018 avatar Jan 16 '21 07:01 chenziyi2018

is it a bug or a question ?

vietj avatar Jan 18 '21 08:01 vietj

is it a bug or a question ?

its a question. I write a demo like this, bug it not work : subscriber cant receive the published message.

i want to use the vertx in my project. but development documents are too less, and the samples are too simple. can you give me some suggests?

chenziyi2018 avatar Jan 20 '21 03:01 chenziyi2018

I have the same problem,

endpoint.publishHandler { message ->
       // this is ok;
        println("publishHandler topic: ${message.topicName()}  payload: ${message.payload()}")
        if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
          endpoint.publishAcknowledge(message.messageId())
        } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
          endpoint.publishReceived(message.messageId())
        }
      }.publishReleaseHandler { messageId -> endpoint.publishComplete(messageId) }

but other clients subscribe topic can't get the message; do you need to manually forward the subscription to other clients

zhengchalei avatar May 14 '21 17:05 zhengchalei

how to get message from subscribe ? i can't find a method to get message from

sunqb avatar May 19 '21 04:05 sunqb

how to get message from subscribe ? i can't find a method to get message from

method name is not good,i think 。 this is not a bug,i use 'publishHandler' to get message from subscribe

sunqb avatar May 19 '21 04:05 sunqb

yeah, vertx-mqtt is not a fully featured MQTT broker. You need to complete pub/sub message passage by yourself.

pigbayspy avatar Jul 27 '21 03:07 pigbayspy