vertx-mqtt
vertx-mqtt copied to clipboard
subscriber cant receive topic message
Questions
- subscriber cant receive topic message
Version
4.0.0
Context
- client code
` @Slf4j public class MqttClientVerticle extends AbstractVerticle {
private MqttClient mqttClient;
\@Override
public void start() throws Exception {
log.info("good we are in now");
MqttClientOptions options = new MqttClientOptions()
.setKeepAliveInterval(5)
.setClientId("client_1111")
.setAutoKeepAlive(true);
mqttClient = MqttClient.create(vertx, options);
mqttClient.publishHandler(handle -> {
log.info("receive topic:[{}],payload:[{}]", handle.topicName(), handle.payload());
});
mqttClient.connect(1883, "localhost", handler -> {
mqttClient.subscribe("/topic", 0);
});
}
@Override
public void stop() throws Exception {
if (mqttClient != null) {
log.info("disconnect");
mqttClient.disconnect();
}
}
}`
- server code
` @Slf4j public class MqtttServer extends AbstractVerticle { private static final String TOPIC_LIGHTS = "lights";
@Override
public void start() throws Exception {
MqttServerOptions options = new MqttServerOptions().setPort(1883).setHost("0.0.0.0");
MqttServer mqttServer = MqttServer.create(vertx, options);
mqttServer.endpointHandler(endpoint -> {
System.out.println("connected client " + endpoint.clientIdentifier());
endpoint.accept(false);
endpoint.pingHandler(h -> {
log.info("receive client [{}] ping message", endpoint.clientIdentifier());
});
handleSubscription(endpoint);
handleUnsubscription(endpoint);
publishHandler(endpoint);
handleClientDisconnect(endpoint);
})
.listen(ar -> {
if (ar.succeeded()) {
log.info("MQTT server is listening on port " + ar.result().actualPort());
} else {
log.info("Error on starting the server");
ar.cause().printStackTrace();
}
});
}
private static void handleSubscription(MqttEndpoint endpoint) {
endpoint.subscribeHandler(subscribe -> {
List<MqttQoS> grantedQosLevels = new ArrayList<>();
for (MqttTopicSubscription s : subscribe.topicSubscriptions()) {
grantedQosLevels.add(s.qualityOfService());
}
String topicNames = subscribe.topicSubscriptions().stream().map(MqttTopicSubscription::topicName).collect(Collectors.joining(","));
log.info("{} subscribe :{}", endpoint.clientIdentifier(), topicNames);
endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);
for (MqttTopicSubscription topicSubscription : subscribe.topicSubscriptions()) {
endpoint.publish(topicSubscription.topicName(), Buffer.buffer("hello"),MqttQoS.EXACTLY_ONCE,false,false);
}
endpoint.publishAcknowledgeHandler(messageId -> {
System.out.println("Received ack for message = " + messageId);
}).publishReceivedHandler(endpoint::publishRelease).publishCompletionHandler(messageId -> {
System.out.println("Received ack for message = " + messageId);
});
});
}
private static void handleUnsubscription(MqttEndpoint endpoint) {
endpoint.unsubscribeHandler(unsubscribe -> {
String topics = String.join(",", unsubscribe.topics());
log.info("{} unsubscribe :{}", endpoint.clientIdentifier(), topics);
endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
});
}
private static void publishHandler(MqttEndpoint endpoint) {
endpoint.publishHandler(message -> {
log.info("client [{}] publish message :{}, to topic:[{}]", endpoint.clientIdentifier(), message.payload(), message.topicName());
if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
endpoint.publishAcknowledge(message.messageId());
} else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
endpoint.publishRelease(message.messageId());
}
}).publishReleaseHandler(endpoint::publishComplete);
}
private static void handleQoS(MqttPublishMessage message, MqttEndpoint endpoint) {
if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
String topicName = message.topicName();
if (TOPIC_LIGHTS.equals(topicName)) {
LightsController.handler(message);
}
endpoint.publishAcknowledge(message.messageId());
} else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
endpoint.publishRelease(message.messageId());
}
}
private static void handleClientDisconnect(MqttEndpoint endpoint) {
endpoint.disconnectHandler(h -> {
log.info("{} disconnect", endpoint.clientIdentifier());
});
}
}`
- controller code
` @Slf4j public class VertxServer extends AbstractVerticle {
@Override
public void start() throws Exception {
HttpServer server = vertx.createHttpServer();
Router router = Router.router(vertx);
router.route("/static/*").handler(StaticHandler.create("templates"));
router.get("/hello").handler(ctx -> {
MqttClientOptions options = new MqttClientOptions().setClientId("client_sender_111");
MqttClient mqttClient = MqttClient.create(vertx, options);
mqttClient.connect(1883, "localhost", h -> {
if (h.succeeded()) {
mqttClient.publish("/topic", Buffer.buffer("hello, how are you"), MqttQoS.AT_MOST_ONCE, false, false, s -> {
log.info("send success? {}", s.succeeded());
mqttClient.disconnect();
});
}
});
// This handler will be called for every request
HttpServerResponse response = ctx.response();
response.putHeader("content-type", "text/plain");
// Write to the response and end it
response.end("send success");
});
server.requestHandler(router).listen(9091, h -> {
log.info("server start:{}", h.succeeded());
});
}
}`
description
when the client init on project start, I can receive the msg publish by the mqtt server subscribeHandler "hello", but when I use the web router "/hello" to publish msg to the topic "/topic" the subscriber client "client_1111" cant recieve the msg ...
help me please ~ T_T ~
sorry its not bug, I used a wrong tag
is it a bug or a question ?
is it a bug or a question ?
its a question. I write a demo like this, bug it not work : subscriber cant receive the published message.
i want to use the vertx in my project. but development documents are too less, and the samples are too simple. can you give me some suggests?
I have the same problem,
endpoint.publishHandler { message ->
// this is ok;
println("publishHandler topic: ${message.topicName()} payload: ${message.payload()}")
if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
endpoint.publishAcknowledge(message.messageId())
} else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
endpoint.publishReceived(message.messageId())
}
}.publishReleaseHandler { messageId -> endpoint.publishComplete(messageId) }
but other clients subscribe topic can't get the message; do you need to manually forward the subscription to other clients
how to get message from subscribe ? i can't find a method to get message from
how to get message from subscribe ? i can't find a method to get message from
method name is not good,i think 。 this is not a bug,i use 'publishHandler' to get message from subscribe
yeah, vertx-mqtt is not a fully featured MQTT broker. You need to complete pub/sub message passage by yourself.