Exclusive queues can be deleted without the consumers being notified
Describe the bug
Given a 3-node cluster with Khepri enabled, if a node is partitioned (certainly happens with the leader being partitioned, I'm not sure if it can if the follower is partitioned), exclusive queues can be deleted without their corresponding connections being notified nor terminated (so from the client's perspective, it just looks like there are no messages).
Originally reported in: https://github.com/rabbitmq/rabbitmq-server/discussions/12829#discussion-7563564
Reproduction steps
- 3-node cluster with Khepri enabled, a publisher and consumer(s) using exclusive queues on a node with the Khepri leader
- The node with the Khepri leader, all connections and queues is partitioned
- Clients don't notice anything, since all connections and queues are available on the local node
- A minute later (net tick timeout), the nodes realize they can't talk to one of the nodes. They all trigger the deletion of exclusive queues on the partitioned node. However, the node with the queues doesn't delete them, because the queue processes are alive. The other two nodes delete the queues in Khepri, but it has no practical effect, because they can't communicate with the node where the queues are running.
- Once the partition heals, the partitioned node realized it should be a Khepri follower now, synchronizes the metadata changes (including the deletions), deletes the queues but doesn't notify the consumers
Expected behavior
We discussed removing the special case for exclusive queues altogether. Currently, they are considered transient internally, but with Khepri, there's really no concept of a transient declaration, since all declarations are persisted in Khperi's log. Getting rid of the whole "transient" concept once and for all should simplify the code, documentation and in general make RabbitMQ easier to use and understand.
Either way, the queues should not be deleted or the consumers should be notified.
Additional context
All logs from a reproduction: rmq-server-0.log rmq-server-1.log rmq-server-2.log
- a 3-node cluster deployed to Kubernetes
- rmq-server-2 is the Khepri leader
-
perf-test -H amqp://rmq-server-2.rmq-nodes -E -t fanout -e amq.fanout -y 10 -r 1- all connections and exclusive queues are on rmq-server-2; as expected, perf-test publishes 1 message per second and consumes 10 messages per second (fanout to 10 exclusive queues) - Mon Dec 16 12:41:20 CET 2024: rmq-server-2 is partitioned
- Mon Dec 16 12:42:24 CET 2024: rmq-server-2 is no longer partitioned
- perf-test keeps running without any errors, still publishes 1 msg/s but doesn't consume any messages anymore
The logs contain some additional debug logs which show that all nodes attempted to deleted the exclusive queue on a down node, but:
- server-2 didn't, because they were alive
- server-1 deleted the queues (
10 transient queues from node '[email protected]' deleted in 0.066262s) and sent notificaitons but this was not effective, since server-2 didn't get the memo - at 12:42:26 server-2 rejoins the cluster, becomes the follower and actually deletes the queues, but doesn't notify consumers
Thank you @mkuratczyk for creating the bug ticket.
Since this issue is important for us, we would try to contribute a fix for it ourselves.
Just sending the consumer cancel for any auto-delete queue when the partition resolves would be sufficient and probably the only thing we could fix.
I will have a look at the code and come back with a proposal/ideas or maybe some questions.
Thanks again!
Hi @mkuratczyk ,
An update on this issue:
So, tried out some things, and, in the end, to solve the problem for us I implemented and tested the below idea:
- Implement a basic but generic way to register an external callback to be notified when a key (representing some metadata) is deleted or updated/inserted in Khepri store (implemented in Khepri, see https://github.com/Rmarian/khepri/tree/add-persistence-event-callback-support branch changes )
- Have RabbitMQ register such a callback for queues only for each Vhost (See changes in https://github.com/Rmarian/rabbitmq-server/tree/%2312949)
- Upon receival of a delete queue event from Khepri, RabbitMQ stops the local queue process if it is still alive
This has worked for us to receive the consumer cancel after a partition resolves. Also from what I tested didn't see any obvious issues.
Anyway, the code is only a first draft and still needs to be polished and further tested...
What I would like now is to get some early feedback from you if this approach is sound and would be acceptable to be merged in the main repositories.
If not, we need other ideas :)
Thanks, Radu.
Hi!
Thank you for taking the time to dig into this and provide an implementation, I really appreciate it!
The feature you are adding to Khepri already exists, perhaps it’s incorrectly documented or it didn’t match your expectations or use case? See Stored procedures and triggers.
Someone suggested a simpler API than stored procedures for triggers a few years ago. I filed rabbitmq/khepri#57 to remember about it but never came to implement it so far. This idea might be a better fit for this use case.
Now on the RabbitMQ side, I don’t know what the right approach should be yet, I need to think about this. I mean, I’m not sure if the queue should be deleted in the first place. What happens with Mnesia by the way?
@Rmarian thank you for taking the time to contribute. Have you performed your test with, say, 200K queues? Because in our experience what works well with 5 queues won't necessarily work as well with hundreds of thousands.
Khepri overload would be a very serious operational problem for a cluster.
Hi @dumbbell ,
The feature you are adding to Khepri already exists, perhaps it’s incorrectly documented or it didn’t match your expectations or use case?
- You are right. I didn't read the stored procedure and triggers section carefully because it seemed to me like they provide something different than I needed.
Now on the RabbitMQ side, I don’t know what the right approach should be yet, I need to think about this. I mean, I’m not sure if the queue should be deleted in the first place. What happens with Mnesia by the way?
- The same thing happens: exclusive queues belonging to a node that is isolated from the rest of the cluster is deleted by the majority. But our consumers were not affected by this because we were always using the pause_minority strategy and the minority node would shut down it's connections temporarily and our clients would then get the chance to re-connect to another node and re-declare it's state (queue, consumer) and move on with message processing.
- With Khepri, this behavior has changed, minority nodes keep their connections open and when the partition resolves, Khepri silently deletes any local exclusive queue (it has to since that is what the consensus dictates!) without sending any notification to its client consumers (because the queue processes are still running).
- what we are trying to do with these changes is to re-sync the state of the node with the Khepri state after partition recovers so that "zombie" queue processes are stopped and clients notified about this
@Rmarian thank you for taking the time to contribute. Have you performed your test with, say, 200K queues? Because in our experience what works well with 5 queues won't necessarily work as well with hundreds of thousands.
Khepri overload would be a very serious operational problem for a cluster.
Not yet...but it is on my to do list.
Thinking about how such a test would look like:
- Have 250K exclusive queues declared on a single node
- Isolate that node from the rest of the cluster
- Wait until the majority deletes all these queues
- Re-join the node to the cluster
- All the while have a job that continuously monitors Khepri availability for that node by continuously declaring metadata and reporting response times
- Maybe I should also monitor message routing times as well?
- CPU and memory spikes if any also I guess
Now on the RabbitMQ side, I don’t know what the right approach should be yet, I need to think about this. I mean, I’m not sure if the queue should be deleted in the first place. What happens with Mnesia by the way?
- The same thing happens: exclusive queues belonging to a node that is isolated from the rest of the cluster is deleted by the majority. But our consumers were not affected by this because we were always using the pause_minority strategy and the minority node would shut down it's connections temporarily and our clients would then get the chance to re-connect to another node and re-declare it's state (queue, consumer) and move on with message processing.
I see, so the "notification through disconnection" is kind of a byproduct of the partition recovery strategy. Thus, the code to explicitly notify consumers in this case might be missing in the first place.
@dumbbell the code to notify consumers if a queue is stopped is already present here. If I understand @Rmarian correctly, the problem is that the queue record is deleted in Khepri while the actual queue process is still running even after the partition resolves ("zombie" process). Hence, the client doesn't get notified.
Thank you @ansd!
Hi @dumbbell ,
I tried to use the trigger with stored procedure instead as you suggested but I was unable to make it work like I want.
Three things missing:
- I need the stored procedure to be executed locally by every node when the trigger is hit. Currently Khepri triggers only support execution on the leader.
- The stored procedure params (Props) do not provide enough context information. The full matched Path is needed as well as the value of the node being updated/deleted
- I also need a trigger for when a snapshot is installed by the follower (we need to check and re-sync all queues on the node at this time)
Now, looking at the API, it seems it would not be too difficult to extend it in a non breaking manner.
For example, we could pass a tuple with {Fun, local} to khepri:put() instead of just the function and store an extra flag in the #p_sproc{} record. Similarly, we could extend the khepri_evf filter on_action list to include a new element snapshot_install in addition to delete/update.
But I don't know if such changes would make sense on their own in Khepri... What do you think?
Thank you! I came to the same conclusion.
I will think about the API for Khepri 0.18.x and come back to you.
Please refrain from "any updates?" and similar comments or this issue will be locked.
When there is progress, issues are closed or at least mentioned in commits, pull requests and so on. We do not have a consensus on how Khepri should change to accommodate such a feature. And we won't before RabbitMQ 4.1.0 ships and then some time passes.
https://github.com/rabbitmq/rabbitmq-server/pull/14573 is one proposed solution.
@Rmarian: The fix for this issue is ready and available in rabbitmq/rabbitmq-server#14573. Could you please give it a try and tell me if it is fixed for you too?