rumqtt icon indicating copy to clipboard operation
rumqtt copied to clipboard

Persistent storage of pending messages

Open mladedav opened this issue 2 years ago • 0 comments

I would like to send QoS 1 messages with the guarantee intact even if the sending device unexpectedly restarts. This would need persistent storage but the messages there need to be discoverable by a packet ID so that they can be deleted when acknowledged.

I tried making this work outside of this library but I run into races when I try to save a packet ID to a common structure for consumption by the thread responsible for polling the client, but PubAck is received before the packet ID is known.

Would there be interest for making the backing storage for messages? Currently, it's a vector of Publish packets and two more vectors of u16 for incoming and pub_rel states. All operations needed by the backing structure I could find are:

  • Iteration over pending messages
  • Random access by packet ID

To be able to implement this reliably without support inside this library, I have come up with these solutions:

  • After publish wait for a Publish packet and add it to a concurrent collection with ID in storage. When poll returns PubAck, delete the given message. This does not work because there is no way to synchronize (without completely blocking the client) so that the concurrent collection is ready in time for the PubAck packet.
  • If I knew the packet ID of the Publish packet before it is sent this would have been solved. This would rely on the planned returned packet IDs, but this could also result in broken ordering of sent Publish packet IDs which the library somewhat relies on.
  • Choosing packet IDs directly. However, that would currently not work since the IDs are internally used to index a vec of outstanding messages.
  • #291 would also work for my issue

mladedav avatar Apr 05 '22 14:04 mladedav