paho.mqtt.java icon indicating copy to clipboard operation
paho.mqtt.java copied to clipboard

Topic filter does not match shared subscription, listeners are not notified on actual topic

Open Adam-Grzenda opened this issue 3 years ago • 2 comments

Please fill out the form below before submitting, thank you!

  • [X] Bug exists Release Version 1.2.5 ( Master Branch)
  • [ ] Bug exists in MQTTv3 Client on Snapshot Version 1.2.6-SNAPSHOT (Develop Branch)
  • [X] Bug exists in MQTTv5 Client on Snapshot Version 1.2.6-SNAPSHOT (Develop Branch)

TLDR: TopicValidator does not match shared subscription topics with actual message topics. i.e. $share/GROUP-NAME/some-topic is not matched when a message comes from some-topic.

https://github.com/eclipse/paho.mqtt.java/blob/f4e0db802a4433645ef011e711646a09ec9fae89/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/common/util/MqttTopicValidator.java#L151

Long description: I've encountered an issue using the MQTTv5 feature - shared subscriptions. In order to do a broker-side load-balancing between running application instances I have to subscribe to a topic with a prefix: $share/GROUP-NAME/

That way I can assure that my messages are distributed between instances according to my broker settings - unfortunately even though messages are delivered to my Paho client (which I can see by running a generic callback for all messages), a listener registered while subscribing is not invoked.

This is due to the messages having $share/GROUP-NAME/ prefix stripped - I've subscribed to $share/GROUP-NAME/some-topic, and incoming messages come from just some-topic.

As far as I can understand it is a part of the standard: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250

At the moment I've been forced to use a generic callback to manually invoke my listener - however this does feel like a hack.

Minimal code to reproduce

Expected standard out: Message arrived in listener, topic: someTopic/AAA-BBB-CCC, message: Actual standard out: Message arrived in callback, topic: someTopic/AAA-BBB-CCC, message:


public class MyApplication {
    
    public static void main(String[] args) throws MqttException, InterruptedException {
        MqttClient client = new MqttClient("tcp://localhost:1883", "myClientId", new MemoryPersistence());
        client.setCallback(new MqttCallbackImpl());
        client.connect();

        MqttSubscription subscription = new MqttSubscription("$share/GROUP-NAME/someTopic/#", 2);
        client.subscribe(new MqttSubscription[]{subscription}, new IMqttMessageListener[]{new MqttListener()})
                .waitForCompletion();
        
        while (true) {
            client.publish("someTopic/AAA-BBB-CCC", new MqttMessage());
            Thread.sleep(1000);
        }
    }
    
    public static class MqttListener implements IMqttMessageListener {
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            System.out.println("Message arrived in listener, topic: " + topic + " message: " + message);
        }
    }
    
    public static class MqttCallbackImpl implements MqttCallback {
        @Override 
        public void messageArrived(String topic, MqttMessage message) {
            System.out.println("Message arrived in callback, topic: " + topic + " message: " + message);
        }
        @Override public void deliveryComplete(IMqttToken token) {}
        @Override public void connectComplete(boolean reconnect, String serverURI) {}
        @Override public void authPacketArrived(int reasonCode, MqttProperties properties) {}
        @Override public void disconnected(MqttDisconnectResponse disconnectResponse) {}
        @Override public void mqttErrorOccurred(MqttException exception) {}
    }

} 

Adam-Grzenda avatar Nov 10 '22 21:11 Adam-Grzenda

I have also encountered the same issue as you, and I suspect that the problem lies in the deliverMessage method of CommsCallback, where the MqttTopicValidator.isMatched method is not taking into account the prefix "$share" for shared subscription topics.

SpikeFJ avatar Jan 08 '24 06:01 SpikeFJ

Related https://github.com/eclipse/paho.mqtt.java/pull/911

nwest1 avatar Feb 14 '24 16:02 nwest1