MQTTnet icon indicating copy to clipboard operation
MQTTnet copied to clipboard

Store pending messages on broker's side

Open elektromntr opened this issue 3 years ago • 10 comments

Describe the feature request

I need to store messages pending in server's queues, that are not yet delivered to subscribers. And when server crashes, I would like to restore these queues to be sure that every message reaches each subscriber.

Which project is your feature request related to?

  • Server

Describe the solution you'd like

I need to store messages pending in server's queues, that are not yet delivered to subscribers. And when server crashes, I would like to restore these queues to be sure that every message reaches each subscriber. For example, when client (subscriber) is disconnected for some time, its queue on server side is populated with new pending messages. And when server crashes at this point, subscriber will lose those messages. Even when the server restarts and subscriber's reconnect will take place, messages are lost.

Additional context

Place of storing should be decided on my own. It may be file, database or whatever.

@saba-sabrin I think we just store the messages in memory, so if the broker is restarted you will still loose messages.

@chkr1011 should we extend IMqttServerStorage or provide a similar interface like IMqttSessionStorage to store pending messages of a session?

Originally posted by @JanEggers in https://github.com/chkr1011/MQTTnet/issues/498#issuecomment-448552957

elektromntr avatar Aug 02 '21 08:08 elektromntr

This feature is still in progress but unfortunately it is hard to implement. Especially when performance should by high and feature set rich. The problem is when should the messages being stored? Every time a message is produced? Then the broker will get slow when a lot of traffic is happening. But when storing the messages on a clean exit of the server you will lose all messages on an app crash etc.

chkr1011 avatar Sep 05 '21 10:09 chkr1011

I wanted to comment this is an important feature. I understand the performance implications. Other brokers offer session persistence on stop/start scenarios...I'm not sure how they work on crash protection. Since there will be a performance impact for a crash protection setup, then it could be designed as an optional feature to turn on/off so it could be implemented by the user depending on their use case. Right now I'm using mqtt for a platform that won't have extremely high throughput, but session persistence is very important.

Thanks for your work on this project.

tcbarton avatar Oct 17 '21 17:10 tcbarton

I'd like to look into implementing the interfaces required to have an extensible solution similar to what is done with the retained message manager. There are already interfaces IMqttServerPersistedSessionsStorage and IMqttServerPersistedSession that seem to aim at handling persisted sessions but so far they don't seem to used. @chkr1011, any pointers regarding intent or direction would be appreciated. I'm also happy to add any additional interfaces and hook them up to server options and server code, unless there is already work in progress by anyone?

logicaloud avatar Oct 20 '21 19:10 logicaloud

The classes you mentioned are indeed intended for storing this data. For now the biggest design question is when the storage should be invoked? For every message? With all messages in a batch etc.? Performance is a big issue here. If we store the messages as soon as a new message is published the broker will get very slow. So the interfaces should give some flexibility to the user like access to all data in a atomic operation and also handlers for a single pending message. Then users can also decide to save things ONLY when the broker is shutting down properly (losing data at a crash might be acceptable). But I am also in the middle of a big rewrite now so you might have a lot of merge issues when starting with this right now. You can have a look at the branch Reason_Code_In_Subscription_Interceptor.

But for starters you can focus on "filling" the session upon creation with the existing data. When this is working we can focus on saving the data at a proper time.

Recommendation: Try Test Driven Development so that we have enough Unit Tests in the first place.

chkr1011 avatar Oct 23 '21 09:10 chkr1011

@chkr1011, thank you. Yes, the interfaces/extension points must give the implementer the opportunity to follow the MQTT specification to the letter. That means interface methods for storage must be called for every message. The difficult design decisions are then for the implementer to consider, for example, whether messages are stored to the underlying storage every time, or whether they are buffered for a while. A start/stop method for storage on shutdown would also be helpful.

I'd like to press ahead with this even if it means more merge work later. It sounds like the work on Reason_Code_In_Subscription_Interceptor will take a while? I'll check how a set of interfaces and call points would hang together in master and (hopefully) have some basis for discussion in a week or so.

logicaloud avatar Oct 23 '21 20:10 logicaloud

There are quite a few touch points in code where the "Persistent Session Manager" interface needs to be called but it all looks good so far. Because I have been working on the interface only (not the storage behind the interface), I think I'll open another issue or pull request "at the right time". I suspect that a good time for a pull request will be when the Reason_Code_In_Subscription_Interceptor branch is merged into master.

logicaloud avatar Nov 03 '21 03:11 logicaloud

@logicaloud This is a feature that I would very much like to utilize. What is the current status?

mark-trumed avatar Oct 25 '22 22:10 mark-trumed

@mark-trumed The MqttPersistedSessionManager is currently implemented in fork https://github.com/logicaloud/MQTTnet/tree/persisted-sessions-v4 but somewhat tangled up with some other added MQTT5 features like session and message expiry and improvements for retained message handling. My thinking was to untangle the features then create pull requests separately, closer to the end of the year when I have more time. I'm not sure how this would fit in with current ongoing developments. @chkr1011, any comments are welcome.

logicaloud avatar Oct 25 '22 23:10 logicaloud

@logicaloud @chkr1011 Persistence is mandatory for any application where data integrity is nonnegotiable (no matter the performance impact). Is there any update on the progress?

Since I need this feature I have to try to implement a crude solution myself. My plan was to persist both the subscriptions and the unfinished messages and resubsribe all clients on startup and resend all messages after that. Do I also need to persist the Sessions? And if so, how can I get the sessions and re-inject them on startup? Furthermore, how do I know if a message was delivered to all clients and is dequeued by the server? My understanding is that the server stores all messages (with QoS > 0) in some collection and removes them from the collection once all subscribed clients received the message. Is there an event for that? I don't think ClientAcknowledgedPublishPacketAsync is what I'm looking for, however I could be wrong since there is no documentation.

Yekt avatar Aug 09 '23 09:08 Yekt

I'm a bit in limbo with this at the moment and I think the ball is in @chkr1011's court. There is a discussion going on over here: https://github.com/dotnet/MQTTnet/discussions/1764.

As mentioned there, the persistent messages and persistent sessions integration should probably use interfaces instead of events. Once that is agreed on, then I would like to update the branch referred to in the discussion to change from an event based approach to using interfaces, then tidy up, review, and create some pull requests. The end result would provide suitable extension points within the MQTTnet library to hook up storage providers like Redis or SQL databases (to be implemented in a separate library). The integrated default implementation would use in-memory retained messages and persistent sessions, so the built-in features would be limited but directly usable.

I'm not sure about whether there is a notification about all messages being delivered; if not then that maybe that is something that could be considered going forward.

logicaloud avatar Aug 09 '23 20:08 logicaloud