rabbitmq-server icon indicating copy to clipboard operation
rabbitmq-server copied to clipboard

Retained messages are only delivered to clients for which a queue currently exists

Open chriswue opened this issue 7 years ago • 6 comments

Hi,

if I publish a message under a topic with the RETAIN flag set to true and QoS 1 and then subscribe with a new client for the first time (again QoS 1) the retained message doesn't get delivered. However it works if the client has connected before and a durable queue has been created.

Simple example:

Publisher:

import paho.mqtt.client as mqtt

client = mqtt.Client(client_id="pub", clean_session=False)
client.connect("localhost", 1883, 60)
client.publish(topic='test', qos=1, payload="This is a test")
client.loop()

Subscriber:

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print("Sub connected with result code " + str(rc))
    client.subscribe("#", qos=1)

def on_message(client, userdata, msg):
    print("Message for topic {} received: {}".format(msg.topic, str(msg.payload)))

client = mqtt.Client(client_id="testsub", clean_session=False)
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost", 1883, 60)

client.loop_forever()
  • Run the publisher: it will publish the message and then quit.
  • Now run the subscriber for the first time: It will not get the retained message delivered
  • Kill the subscriber
  • Run the publisher again: it will publish the message and then quit.
  • Run the scriber again (with the same client id): it will now get the retained message delivered

It looks like the RETAIN flag is implemented by delivering the message to all existing persistent queues but there is no persistence beyond that (e.g. for new clients or clients which where disconnected for more than a day - since the default expiry time for persistent queues is 1 day)

To me this sounds like a spec-violation bug:

When a new subscription is established, the last retained message, if any, on each matching topic name MUST be sent to the subscriber [MQTT-3.3.1-6]

chriswue avatar Feb 21 '17 02:02 chriswue

The message is sent but if there is no queue because it has expired, it is not routed anywhere. Protocol spec authors assume that servers can keep retained messages around forever but there has to be a realistic limit in a practical systems. In RabbitMQ MQTT plugin it is 24 hours by default. You can change it it via configuration.

The plugin could re-declare a queue when publishing will messages, it's worth investigating if it has enough context to do that.

michaelklishin avatar Feb 21 '17 07:02 michaelklishin

OK, I misread the description so it's retained messages and not will that's in question.

Then your understanding is close to being correct. Queue names are based on client IDs and not topic names. For QoS 1 subscriptions, durable queues are used. If those expire or do not exist for any other reason, then messages will be published but won't be routed anywhere.

This is further complicated by the clean session flag: the plugin uses auto-deleted queues for connections that set it to true, see rabbitmq/rabbitmq-mqtt#37.

I will try to clarify it in the docs but with the current plugin architecture where MQTT is layered on top of abstractions and entities from another protocol, there is only so much that be done. So to really address scenarios such as this, mMoving to a more protocol-agnostic implementation, which we hope to do one day.

michaelklishin avatar Feb 21 '17 08:02 michaelklishin

Ok, I suspected as much since the issue relating to the implementation of the RETAIN flag mentioned that AMQP has no concept of it.

I guess a way to do this to use the existing persistence backend to also persist retained messages for topics (like creating separate topic based queues). When a new subscription comes in you'd have to check all the persisted topic messages to check if any of the topics match the subscription. I guess this doesn't entirely fit the move to a more protocol agnostic implementation but more like crowbarring in the functionality

chriswue avatar Feb 22 '17 00:02 chriswue

Team

We are having issue with clustering/mirroring in rabbitmq with mqtt versions: 3.6.15 Erlang 19.3

So we where having data discrepancies while doing pub/sub in 2 node cluster. Publish with retaining flag One time we are getting another data while doing subscribe and next time we are doing subscribe we are getting different data on different client id. Doing it from MQTTfx.

Even we enable the mirror also: rabbitmqctl set_policy ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

abhi-dwivedi avatar Dec 10 '18 14:12 abhi-dwivedi

@abhi-dwivedi this is not a support forum. What you are looking at is covered in rabbitmq/rabbitmq-mqtt#91, rabbitmq/rabbitmq-mqtt#127 is also relevant.

michaelklishin avatar Dec 10 '18 14:12 michaelklishin

Does this mean that there is no way that NEW (unknown) devices will ever receive the retention message an a certain toppic?

MartyMcFlyInTheSky avatar Feb 28 '22 07:02 MartyMcFlyInTheSky

This issue is old and misleading. I'm closing it in favour of https://github.com/rabbitmq/rabbitmq-server/issues/8824.

ansd avatar Jul 10 '23 13:07 ansd

@ansd So the functionality of delivering retained messages to clients that were never subscribed before now actually works?

chriswue avatar Jul 12 '23 02:07 chriswue

@chriswue this issue was moved to #8824.

michaelklishin avatar Jul 12 '23 05:07 michaelklishin

@chriswue yes, delivering retained messages to clients that were never subscribed before works with the following limitations:

  1. If the topic filter contains wildcards (e.g. # as in your example above), no retained messages are sent: https://github.com/rabbitmq/rabbitmq-server/issues/8824
  2. Retained messages are stored only node local. That is, if a client publishes a retained messages to node A, and another client subsequently subscribes to node B, that subscribing client won't receive any retained messages: https://github.com/rabbitmq/rabbitmq-server/issues/8096

So, the functionality of retained messages in RabbitMQ is currently very limited.

An example that works: Client publishes a message to node A with topic topic/1 and thereafter another client connected to node A subscribes with topic filter topic/1. (Note in this example everything is local to node A, and the client does not use wildcards in its topic filter).

ansd avatar Jul 12 '23 07:07 ansd