rabbitmq-server
rabbitmq-server copied to clipboard
Publisher de-duplication for quorum queues [WIP]
Quorum queues need to support publisher de-duplication such that when server issues occur and a publisher does not receive a confirm within some timeframe the publisher can retry without the chance of duplicate messages appearing in the queue(s) that were routed.
There are two common approaches:
- De-duplicate based on some globally unique message-id.
- De-duplicate based on an incrementing sequence number.
Approach 1 relies on the queue/broker maintaining a set of the (all) message-ids seen and checking this set before enqueueing the message. To ensure the de-duplication set doesn't grow huge it is typically constrained by a size limit or time window. Of course any removal of elements from the set allow the possibility of a duplicate still appearing in the queue if the second send attempt was after the truncation window. Azure ServiceBus for example takes this approach
Approach 2 is the same approach as we already use for RabbitMQ streams. Any messages with a lower or equal sequence number to the last seen by any given queue will not be enqueued. This also requires some kind of publisher-id as the sequence will not be globally unique and the queue will need to maintain a map of publisher-id => last-seen-sequence. Again some kind of constraints need to be put on the number of publisher-ids maintained by the queue to avoid unbounded growth.
This plugin make a good attempt at applying 1. at the exchange level but it does not support Quorum Queues and there are inherent issues with de-duplication at the exchange rather than the queue. For example when more than one queue is routed.
As we already use 2. for streams I suggest we also implement the same approach for Quorum Queues over any protocol but for now we will target AMQP 0.9.1.
Unfortunately there aren't any good AMQP properties we can use for this. The message-id property in AMQP 0.9.1 is a shortstr and would not be suitable for carrying the sequence number as this should be a long. The app-id property could carry the "publisher-id", i..e the identifier that maps the message to a sequence. There still isn't a suitable property in AMQP 0.9.1 to carry the sequence itself however and we'd have to rely on a custom header in this case.
In AMQP 1.0 we could possibly use the group-id / group-sequence pair for this feature. There is also the option of using a durable link where the link name would map to the publisher id. This may also allow publishers to negotiate the appropriate starting point.
IMPLEMENTATION:
In the quorum queue state machine we already have a durable concept of "enqueuers" which already support de-duplication. However, these enqueuers are limited to the lifetime of the enqueuering pid (channel) and thus are not suitable for de-duplication that needs to persist beyond the lifetime of the channel. It is clear we need another "type" of enqueuer layer.
Implementation 1
The simplest and most naive implementation would be to keep a map of publisher-id => sequence-number inside the quorum queue state machine record. Every message that arrives at the queue that has the appropriate headers will be checked against this map and dropped or enqueued as appropriate and the map will be updated. This is simple and will handle most use-cases. There are, however, a few downsides with this approach:
- A publisher that repeatedly resends messages with lower sequence number will just grow the log in a similar way to repeated re-queues do.
- Multiple applications may publish messages with the same publisher-id resulting in undefined and lossy/surprising behaviour.
Implementation 2
This version keeps the behaviour of Implementation 1 but adds a few additional constraints.
If we only allow a publishing channel to send messages for a single publisher-id we can assign the publisher-id to the transient enqueuer in the QQ state machine through a registration command and disallow any other registrations for the publisher-id until the the current registration pid either is terminated or it explicitly deregisters. This handles the case where different publishing apps use the same publisher id. We could allow a channel to register against more than one publisher-id or we could restrict it to a single one and terminate the channel if another is seen.
To avoid faulty applications causing undue log growth we can use the "aux" machine feature to check the enqueue command before adding it to the Raft log. The aux machine can keep a state of the last publisherid/sequence it saw and reject further request during the interval between adding a command to the log and it being applied to the state machine. This does incur a slight risk however as there is always a chance a written raft command never will be applied (if a new leader takes over and overwrites it) so we may want to delay the notification back to the channel until the state machine confirms application.
for example: for a given publisher-id if the state machine has seen sequence 5, and 6 is in the log but not yet applied. The aux machine knows about 6 and at this time receives another enqueue command with sequence 6 it cannot immediately reject the second attempt at sequence 6 as it isn't guaranteed that the command that is in the log will be applied. The aux machine in this case will need to wait until the state machine applies the enqueue of the first sequence 6 attempt before the aux machine confirms the second back to the channel.