rabbitmq-server
rabbitmq-server copied to clipboard
Reorganize data in the Khepri store
Why
The previous layout followed the flat structure we have in Mnesia:
- In Mnesia, we have tables named after each purpose (exchanges, queues, runtime parameters and so on).
- In Khepri, we had about the same but the table names were replaced by a tree node in the tree. We ended up with one tree node per purpose at the root of the tree.
Khepri implements a tree. We could benefit from this and organize data to reflect their relationship in RabbitMQ.
How
Here is the new hierarchy implemented by this commit:
rabbitmq
|-- users
| `-- $username
|-- vhosts
| `-- $vhost
| |-- user_permissions
| | `-- $username
| |-- exchanges
| | `-- $exchange
| | |-- bindings
| | | |-- queue
| | | | `-- $queue
| | | `-- exchange
| | | `-- $exchange
| | |-- consistent_hash_ring_state
| | |-- jms_topic
| | |-- recent_history
| | |-- serial
| | `-- user_permissions
| | `-- $username
| |-- queues
| | `-- $queue
| `-- runtime_params
| `-- $param_name
|-- runtime_params
| `-- $param_name
|-- mirrored_supervisors
| `-- $group
| `-- $id
`-- node_maintenance
`-- $node
We first define a root path in rabbit/include/khepri.hrl as [rabbitmq]. This could be anything, including an empty path.
All paths are constructed either from this root path definition (users and vhosts paths do that), or from a parent resource's path (exchanges and queues paths are based on a vhost path).
@michaelklishin: I removed the "4.0" prefix you added to the title because we will probably backport that to 3.13.x once it is stable.
The layout is now in place.
The next challenge is the handling of deletions, especially the bindings. Currently the code does something like this:
- In a transaction:
- A queue is deleted from the database
- Bindings that have that queue as their destination are deleted too
- If there are source exchanges with the auto-delete flag set and have no more bindings, they are deleted too
- Outside of the transaction, deletions are "processed" to emit notifications for the queue, bindings and auto-deleted exchanges.
About the same occurs when a source exchange is deleted.
With this new organization, bindings are implicitly deleted with their source exchange.
Can we cover these situations with keep_while conditions in Khepri?
- One on bindings so that they are deleted when their destination (queue or exchange) is deleted?
- One on exchanges so that they are deleted when they no longer have bindings underneath?
The processing of the deletion should remain about the same as it already runs after records were deleted from the database. This would also allow us to drop the transaction and use a single delete.
A related idea: what about using #if_exists{} conditions when creating resources? For example, we could add " if vhost exists" in the path of an exchange. This way, the insert would be rejected if the vhost was deleted concurrently. This won't replace the keep_while conditions.
The tricky part is doing this while maintaining behavior with Mnesia.
After studying the code more deeply, I believe the current situation is fine:
- Exchanges and bindings that are supposed to be deleted during the deletion of another queue or exchange are already taker care of in a Khepri transaction, and the rest of the logic is handled in
process_deletions()functions that do nod read the database (they get the old records as arguments). - Exchange type callback modules are called from these
process_deletions()functions, so their specific records were already deleted. Fortunately, the callbacks were also simply deleting them. - The vhost code already calls explicitly the deletion function of everything that live underneath before the vhost record is deleted from the database.
Therefore, I think we can defer the improvements to the deletion code to the the time we drop Mnesia.
What do you think @dcorbacho? Am I missing something?
@dumbbell Agree, we're fine with that.
Here is an example of the content of Khepri in an unclustered RabbitMQ with a single PerfTest client running with default options:
●
├── '__khepri_mnesia_migration'
│ ╰── m2k_table_copy
│ ╰── <<"rabbitmq_metadata">>
│ Data: {migration,finished,
│ [rabbit_vhost,rabbit_user,rabbit_user_permission,
│ rabbit_topic_permission,rabbit_runtime_parameters,
│ rabbit_queue,rabbit_exchange,rabbit_exchange_serial,
│ rabbit_route,rabbit_node_maintenance_states,
│ mirrored_sup_childspec,rabbit_durable_queue,
│ rabbit_durable_exchange,rabbit_durable_route,
│ rabbit_semi_durable_route,rabbit_reverse_route,
│ rabbit_index_route]}
│
╰── rabbitmq
├── node_maintenance
│ ╰── rabbit@giotto
│ Data: {node_maintenance_state,rabbit@giotto,regular,#{}}
│
├── runtime_params
│ ╰── internal_cluster_id
│ Data: {runtime_parameters,internal_cluster_id,
│ <<"rabbitmq-cluster-id-9tjXGNZE97EyiRBoH6sHTA">>}
│
├── users
│ ╰── <<"guest">>
│ Data: {internal_user,<<"guest">>,
│ <<195,20,77,64,29,28,55,234,23,125,234,249,78,76,209,126,
│ 72,20,135,236,235,100,222,244,185,61,113,44,7,25,67,198,
│ 179,92,83,95>>,
│ [administrator],
│ rabbit_password_hashing_sha256,#{}}
│
╰── vhosts
╰── <<"/">>
│ Data: {vhost,<<"/">>,[],
│ #{description => <<"Default virtual host">>,tags => []}}
│
├── exchanges
│ ├── <<>>
│ │ Data: {exchange,{resource,<<"/">>,exchange,<<>>},
│ │ direct,true,false,false,[],undefined,undefined,undefined,
│ │ {[],[]},
│ │ #{user => <<"rmq-internal">>}}
│ │
│ ├── <<"amq.direct">>
│ │ Data: {exchange,{resource,<<"/">>,exchange,<<"amq.direct">>},
│ │ direct,true,false,false,[],undefined,undefined,undefined,
│ │ {[],[]},
│ │ #{user => <<"rmq-internal">>}}
│ │
│ ├── <<"amq.fanout">>
│ │ Data: {exchange,{resource,<<"/">>,exchange,<<"amq.fanout">>},
│ │ fanout,true,false,false,[],undefined,undefined,undefined,
│ │ {[],[]},
│ │ #{user => <<"rmq-internal">>}}
│ │
│ ├── <<"amq.headers">>
│ │ Data: {exchange,{resource,<<"/">>,exchange,<<"amq.headers">>},
│ │ headers,true,false,false,[],undefined,undefined,undefined,
│ │ {[],[]},
│ │ #{user => <<"rmq-internal">>}}
│ │
│ ├── <<"amq.match">>
│ │ Data: {exchange,{resource,<<"/">>,exchange,<<"amq.match">>},
│ │ headers,true,false,false,[],undefined,undefined,undefined,
│ │ {[],[]},
│ │ #{user => <<"rmq-internal">>}}
│ │
│ ├── <<"amq.rabbitmq.trace">>
│ │ Data: {exchange,{resource,<<"/">>,exchange,<<"amq.rabbitmq.trace">>},
│ │ topic,true,false,true,[],undefined,undefined,undefined,
│ │ {[],[]},
│ │ #{user => <<"rmq-internal">>}}
│ │
│ ├── <<"amq.topic">>
│ │ Data: {exchange,{resource,<<"/">>,exchange,<<"amq.topic">>},
│ │ topic,true,false,false,[],undefined,undefined,undefined,
│ │ {[],[]},
│ │ #{user => <<"rmq-internal">>}}
│ │
│ ╰── <<"direct">>
│ │ Data: {exchange,{resource,<<"/">>,exchange,<<"direct">>},
│ │ direct,true,false,false,[],undefined,undefined,undefined,
│ │ {[],[]},
│ │ #{user => <<"guest">>}}
│ │
│ ╰── bindings
│ ╰── queue
│ ╰── <<"amq.gen-9chRARA1KM5g_0NqZ8OCQQ">>
│ ╰── <<"3c299664-94e7-443a-b642-f7dde58b759e">>
│ Data: #{{binding,{resource,<<"/">>,exchange,<<"direct">>},
│ <<"3c299664-94e7-443a-b642-f7dde58b759e">>,
│ {resource,<<"/">>,queue,<<"amq.gen-9chRARA1KM5g_0NqZ8OCQQ">>},
│ []} =>
│ []}
│
├── queues
│ ╰── <<"amq.gen-9chRARA1KM5g_0NqZ8OCQQ">>
│ Data: {amqqueue,{resource,<<"/">>,queue,<<"amq.gen-9chRARA1KM5g_0NqZ8OCQQ">>},
│ false,true,none,[],<0.736.0>,[],[],[],undefined,undefined,[],
│ [],live,0,[],<<"/">>,
│ #{user => <<"guest">>},
│ rabbit_classic_queue,#{}}
│
╰── user_permissions
╰── <<"guest">>
Data: {user_permission,{user_vhost,<<"guest">>,<<"/">>},
{permission,<<".*">>,<<".*">>,<<".*">>}}