Shared subscription not working correctly
- [x] Bug exists Release Version 1.2.5 ( Master Branch)
- [ ] Bug exists in MQTTv3 Client on Snapshot Version 1.2.6-SNAPSHOT (Develop Branch)
- [ ] Bug exists in MQTTv5 Client on Snapshot Version 1.2.6-SNAPSHOT (Develop Branch)
When receiving a message on a shared subscription the registered IMqttMessageListener.messageArrived( isn't called. Instead the registered MqttCallback.messageArrived( is called. Workaround could be to subscribe as shared and non-shared subscription.
Reproduce:
- subscribe to topic
$share/group1/any/topic - send message to to topic
any/topic
in V1.2.4, bug also exist. Thanks you solution. Use MqttCallback instead implement MqttCallback it work! @escoand 👍
This is also the case when using wildcard in topics. When deliverMessage is called in CommsCallback, it validates the topic and doesn't allow wildcards in MqttTopic.isMatched. In the end it throws IllegalArgumentException("The topic name MUST NOT contain any wildcard characters (#+)")
in V1.2.4, bug also exist. Thank
did you want to say "Use MqttCallback instead implement IMqttMessageListener" ?
I can also confirm that in version 1.2.5 this code will not work:
mqttClient.subscribe("$shared/group/some_topic/#", 1, (topic, message) -> System.out.println(topic));
instead it's required to use callbacks:
mqttClient.subscribe("$shared/group/some_topic/#", 1);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(final Throwable cause) {
}
@Override
public void messageArrived(final String topic, final MqttMessage message) throws Exception {
System.out.println(topic);
}
@Override
public void deliveryComplete(final IMqttDeliveryToken token) {
}
});
I am still hitting this issue. Paho is the only supported MQTT client with Spring Integration and this makes it a lot less useful than it could be.
It looks like either
org.eclipse.paho.mqttv5.client.internal.CommsCallback#setMessageListenermust be changed so that the callback is mapped against the topic filter without the shared sub prefix(if there is one) ~$share/<share-name>/, ororg.eclipse.paho.mqttv5.common.util.MqttTopicValidator#isMatchedmust be updated to be able to match when there is a shared sub prefix