rumqtt icon indicating copy to clipboard operation
rumqtt copied to clipboard

[rumqttc] Reconnection to MQTT broker and pending messages TTL

Open dsferruzza opened this issue 4 years ago • 4 comments

Hi!

I'm trying to use rumqttc in the context of an Actix Web app (and it seems very nice, thanks for that!). I made a PoC where rumqttc is used from an Actix actor, which holds the client's state and can receive messages from HTTP handlers (for example, to publish MQTT messages). Note: I'm using rumqttc 0.2.0 as 0.3.0 needs tokio 0.3 (I'm hopping that both rumqttc and actix-web will rely on tokio 1.0 in a near future).

When I shutdown my MQTT broker (or the network link between my app and my broker) while publishing messages, rumqttc does a good job of keeping messages, trying to reconnect to the broker and sending the pending messages when it reconnects. This is quite cool, but I feel like I need a bit more in my context:

  1. I can intercept ConnAck messages that means my app just reconnected to the broker, but is there any way to determine that it disconnected? Maybe switching some state when no PingResp is received after a timeout? I guess it could be useful to be able to provide closures in MqttOptions that would be called when this kind of events happen(?) Or at least a warn log message from rumqttc?
  2. In my use case, I really don't want messages older than ~30 seconds to be published when app reconnects to broker. I don't think this can be handled by having timestamps inside messages because it would need time synchronization between every device that publish these messages or subscribes to them... Is there any way to specify a TTL when publishing messages or periodically "garbage collect" pending messages? (Maybe this is an anti-pattern, but I cannot find a way to do this)

dsferruzza avatar Dec 29 '20 20:12 dsferruzza

Cleanest way that I could think here is to allow user attributes while saving outgoing publishes. After reconnection, user can choose to clean pending messages in state before retransmission

tekjar avatar Jan 05 '21 12:01 tekjar

  1. I can intercept ConnAck messages that means my app just reconnected to the broker, but is there any way to determine that it disconnected? Maybe switching some state when no PingResp is received after a timeout? I guess it could be useful to be able to provide closures in MqttOptions that would be called when this kind of events happen(?) Or at least a warn log message from rumqttc?

If warn log is missing, we can add that. Callbacks with closures are complicated in rust IMO. We can raise Disconnect like event when keep alive timer fails

tekjar avatar Jan 05 '21 12:01 tekjar

Cleanest way that I could think here is to allow user attributes while saving outgoing publishes. After reconnection, user can choose to clean pending messages in state before retransmission

How would user choose which messages to keep and which to clean? Wouldn't this be easier and cover more use cases to provide a way to iterate on the pending queue, access messages (meta)data and remove messages based on any criteria?

If warn log is missing, we can add that. Callbacks with closures are complicated in rust IMO. We can raise Disconnect like event when keep alive timer fails

  1. Warn log would be cool!
  2. Connected/Disconnected events would be a nice addition because it would be easy to make apps react to them
  3. Furthermore, maybe Client could retain the current state of connection, so that it could be known at anytime without the user having to accumulate it (for example, useful if it want to periodically send messages only when client is connected)

Do this make sense?

dsferruzza avatar Jan 07 '21 22:01 dsferruzza

Sorry I missed the notification on this issue. You have access to MqttState in [EventLoop](https://docs.rs/rumqttc/0.5.0/rumqttc/struct.EventLoop.html]. You can drop the messages that you don't like (before reconnection)

tekjar avatar Mar 18 '21 12:03 tekjar