rabbitmq-server
rabbitmq-server copied to clipboard
Quorum queues v4
Quorum queues v4
#8261
Tasks:
- [x] Fair share high/low priorities using https://github.com/kjnilsson/hiloq/)
- [ ] ~Consumer timeouts implemented in the queue (rather than in the channel as currently done).~
- [x] SAC consumers honour consumer priorities
- [ ] Credited consumer refactoring to meet AMQP requirements.
- [ ] Use checkpoints feature. https://github.com/rabbitmq/rabbitmq-server/pull/10487
- [x] Consumer cancel option that immediately removes consumer and returns all pending messages.
- [x] More compact commands of the most common commands such as
enqueue
,settle
andcredit
- [ ] Track number of requeues in addition to delivery-count as this would allow us to calculate a correct delivery-count header for AMQP consumers.
Consumer timeouts design
rabbit_fifo
will record the checkout time for each message that is assigned to a consumer. This timestamp
will be used to detect messages that have been kept longer than the consumer timeout configured.
We do not want to use the RA timeout
effect as to do so we'd need to either do expensive and frequent
calculations over the full set of checked out messages or keep lots of timers (one per message).
Instead we'd schedule an aux event every minute which will do a scan over the checked set and if any
consumer has messages with expired timeouts and if so commit a new command eval_consumer_timeouts
to do this work and return messages. This means that will be evaluated some time after the expiry but no more than ~60s
Consumers that let any of their message locks expire should not be assigned any further messages until they send some kind of command (settlement, lock renewal etc) to show that they are live and responding. They should be treated as "suspected" until it is known that they can reply.
This mean we can probably get rid of the (undesirable but necessary with mnesia) behaviour where when the queue received a DOWN
notification with the reason noconnection
it would immediately return all messages. With mnesia this was reasonably correct. If there were cluster disconnected (even shortly) typically the rabbit
application would restart itself in mysterious ways with the ultimate result that channels were terminated. With khepri
this will no longer be the case and the cluster should be able to function normally even if there are short term cluster disconnections.
So going forward when a QQ receives a noconnection
for a consumer process it will only mark it as disconnected (so that new messages are not assigned until it comes back) and let the consumer timeout handle the message return in due course. This means it should be able to handle the case of short term disconnections / reconnections in the cluster without messages being returned unnecessarily.
If the consumer is already in cancelled
state (cancelled but with pending messages) then all pending messages will be returned and the consumer will be removed. This is the safest option there are potentially faulty clients in the wild that will never ack pending messages after a cancellation.
This also means that locks should be relatively short (max 5 mins but ideally lower).
Single Active Consumer consumers that let their messages time out will have all pending messages returned, as well as being replaced. This is to ensure ordering invariants with SAC.
Protocol impl:
AMQP can provide a management extension command to renew locks for a messages. AMQP legacy can configure an auto renew function (that is done by the channel process / queue type) where it will auto renew the lock n number of times on behalf of the client. This is because the legacy protocol (and other protocols such as MQTT / STOMP) don't have any options for implementing lock renewal.
For AMQP legacy we can default to renew locks to the total of the current consumer_timeout
configuration.
Q: Can we do lock renewal without going through Raft log?
When a messages reaches timeout the queue will notify the consumer process with a new Ra event
{message_timeout, consumer_tag(), [MsgIds]}
- how this is handled may depend on the protocol
implementation. AMQP can emit the released or modified outcome. Other protocols don't have the same
mechanism so for AMQP legacy it is probably best to terminate the channel or initiate a broker side consumer cancellation.
Consumer timeouts postponed.