paho.mqtt.java
paho.mqtt.java copied to clipboard
Shared subscription not working correctly
- [x] Bug exists Release Version 1.2.5 ( Master Branch)
- [ ] Bug exists in MQTTv3 Client on Snapshot Version 1.2.6-SNAPSHOT (Develop Branch)
- [ ] Bug exists in MQTTv5 Client on Snapshot Version 1.2.6-SNAPSHOT (Develop Branch)
When receiving a message on a shared subscription the registered IMqttMessageListener.messageArrived(
isn't called. Instead the registered MqttCallback.messageArrived(
is called. Workaround could be to subscribe as shared and non-shared subscription.
Reproduce:
- subscribe to topic
$share/group1/any/topic
- send message to to topic
any/topic
in V1.2.4, bug also exist. Thanks you solution. Use MqttCallback
instead implement MqttCallback
it work! @escoand 👍
This is also the case when using wildcard in topics. When deliverMessage
is called in CommsCallback
, it validates the topic and doesn't allow wildcards in MqttTopic.isMatched
. In the end it throws IllegalArgumentException("The topic name MUST NOT contain any wildcard characters (#+)")
in V1.2.4, bug also exist. Thank
did you want to say "Use MqttCallback instead implement IMqttMessageListener" ?
I can also confirm that in version 1.2.5
this code will not work:
mqttClient.subscribe("$shared/group/some_topic/#", 1, (topic, message) -> System.out.println(topic));
instead it's required to use callbacks:
mqttClient.subscribe("$shared/group/some_topic/#", 1);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(final Throwable cause) {
}
@Override
public void messageArrived(final String topic, final MqttMessage message) throws Exception {
System.out.println(topic);
}
@Override
public void deliveryComplete(final IMqttDeliveryToken token) {
}
});
I am still hitting this issue. Paho is the only supported MQTT client with Spring Integration and this makes it a lot less useful than it could be.
It looks like either
-
org.eclipse.paho.mqttv5.client.internal.CommsCallback#setMessageListener
must be changed so that the callback is mapped against the topic filter without the shared sub prefix(if there is one) ~$share/<share-name>/
, or -
org.eclipse.paho.mqttv5.common.util.MqttTopicValidator#isMatched
must be updated to be able to match when there is a shared sub prefix