Topic filter does not match shared subscription, listeners are not notified on actual topic
Please fill out the form below before submitting, thank you!
- [X] Bug exists Release Version 1.2.5 ( Master Branch)
- [ ] Bug exists in MQTTv3 Client on Snapshot Version 1.2.6-SNAPSHOT (Develop Branch)
- [X] Bug exists in MQTTv5 Client on Snapshot Version 1.2.6-SNAPSHOT (Develop Branch)
TLDR:
TopicValidator does not match shared subscription topics with actual message topics.
i.e. $share/GROUP-NAME/some-topic is not matched when a message comes from some-topic.
https://github.com/eclipse/paho.mqtt.java/blob/f4e0db802a4433645ef011e711646a09ec9fae89/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/common/util/MqttTopicValidator.java#L151
Long description:
I've encountered an issue using the MQTTv5 feature - shared subscriptions. In order to do a broker-side load-balancing between running application instances I have to subscribe to a topic with a prefix: $share/GROUP-NAME/
That way I can assure that my messages are distributed between instances according to my broker settings - unfortunately even though messages are delivered to my Paho client (which I can see by running a generic callback for all messages), a listener registered while subscribing is not invoked.
This is due to the messages having $share/GROUP-NAME/ prefix stripped - I've subscribed to $share/GROUP-NAME/some-topic, and incoming messages come from just some-topic.
As far as I can understand it is a part of the standard: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250
At the moment I've been forced to use a generic callback to manually invoke my listener - however this does feel like a hack.
Minimal code to reproduce
Expected standard out: Message arrived in listener, topic: someTopic/AAA-BBB-CCC, message:
Actual standard out: Message arrived in callback, topic: someTopic/AAA-BBB-CCC, message:
public class MyApplication {
public static void main(String[] args) throws MqttException, InterruptedException {
MqttClient client = new MqttClient("tcp://localhost:1883", "myClientId", new MemoryPersistence());
client.setCallback(new MqttCallbackImpl());
client.connect();
MqttSubscription subscription = new MqttSubscription("$share/GROUP-NAME/someTopic/#", 2);
client.subscribe(new MqttSubscription[]{subscription}, new IMqttMessageListener[]{new MqttListener()})
.waitForCompletion();
while (true) {
client.publish("someTopic/AAA-BBB-CCC", new MqttMessage());
Thread.sleep(1000);
}
}
public static class MqttListener implements IMqttMessageListener {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Message arrived in listener, topic: " + topic + " message: " + message);
}
}
public static class MqttCallbackImpl implements MqttCallback {
@Override
public void messageArrived(String topic, MqttMessage message) {
System.out.println("Message arrived in callback, topic: " + topic + " message: " + message);
}
@Override public void deliveryComplete(IMqttToken token) {}
@Override public void connectComplete(boolean reconnect, String serverURI) {}
@Override public void authPacketArrived(int reasonCode, MqttProperties properties) {}
@Override public void disconnected(MqttDisconnectResponse disconnectResponse) {}
@Override public void mqttErrorOccurred(MqttException exception) {}
}
}
I have also encountered the same issue as you, and I suspect that the problem lies in the deliverMessage method of CommsCallback, where the MqttTopicValidator.isMatched method is not taking into account the prefix "$share" for shared subscription topics.
Related https://github.com/eclipse/paho.mqtt.java/pull/911