nats-streaming-server
nats-streaming-server copied to clipboard
Duplication detection of messages
De-duplication of messages allows clients to have multiple publishers submitting the same message multiple times, and in the case of a publisher failure, the other can still continue adding messages to the queue.
Azure service bus currently implements this with a MessageID header that's provided with each message added. Based on a provided timespan configuration of the queue or topic, it will look back to check to see if a message has been submitted with the same MessageID. If it's contained within this timespan, the message is simply discarded.
The client library internally creates a unique ID per message. This is used by the server to send an ack back to the client, and used by the client to match the ack back to a publish call.
Unfortunately, this is internal, so any Publish()
call of the "same message" (from an application perspective) would result in a new ID. We would need to introduce a new field in the protocol message, or be able to replace the unique ID by a user given ID.
Regardless, this would require a client library change to be able to pass a message ID to the publish call. The time span idea is interesting to avoid the server having to keep a map of all message ids for each channel. That would help reducing the memory usage.
However, for file based stores, where a restart of the server still maintains the state, this message ID would need to be recovered. As of now, only message sequences and position of records on disk are recovered, not the actual message contents. This means that this would need to be added to the index files, which would break compatibility.
I would welcome your feedback. In the meantime, I will leave this issue opened.
For some additional perspective, this is how Amazon SQS implements exactly-once processing with their FIFO queues. They dedupe using one of two strategies: content-based deduplication by computing a SHA-256 hash of the message contents or user-provided dedupe id on messages. They limit dedupe to a 5-minute sliding window to keep dedupe efficient.
@tylertreat That would not solve the problem that on recovery, the server would have to "load" either a NUID or something that can be used by the dedupe code at runtime. Which, as I said, would still require changes that would be backward incompatible (that's ok, but need to be noted).
Yes, the dedupe filter would need to be persisted. What changes would be backward incompatible though? Seems like it's just extending the Store
interface?
Existing data stores. If we put this value in the index file, then the structure changes and older version of NATS Streaming would fail to read it. If we have it part of the MsgProto that is saved in the data files, then it means that the server would have to recover the data, which is costly.
And even, we would need a tool to move say 0.5.0 data stores to say 0.6.0 so that the index files contain the new field. After that, servers prior to 0.6.0 would not be able to read the store.
There were two works recently published which discusses two different methods of exactly once delivery in Kafka. It could be an interesting topic of discussion.
[1] https://segment.com/blog/exactly-once-delivery/ [2] https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-IdempotentProducerGuarantees
This would be great to be able to get something like an at-most-once guarantee over a certain time window.
This would be a great addition, rabbitmq implements deduplication through an external plugin that has tunable TTL and persistency: https://github.com/noxdafox/rabbitmq-message-deduplication
Having something like that implemented would be perfect for several use cases.