rabbitmq-server
rabbitmq-server copied to clipboard
Shovel and Federation (mirrored supervisor) state record migration state during an upgrade to Khepri
Describe the bug
In shovel and federation plugin the child id format of the mirrored supervisor was changed (in 3.12.8 for shovels, in 3.13.0 for federation). When shovels and federated exchanges are created on a prior version and a multi-node RabbitMQ cluster is upgraded to 3.13.0 in a rolling fashion, the old child id format of the workers is preserved. Then when Mnesia is migrated to Khepri, the migration does certain conversion of the child ids to the new format but not in all places where it is used.
The child id key is not changed in the state of the delegate mirrored
supervisor process. Eg.: mirrored_supervisor:which_children/1 still
returns just an #exchange{} record for federation:
2> [element(1, C) || C <- mirrored_supervisor:which_children(rabbit_federation_exchange_link_sup_sup)].
[{exchange,{resource,<<"/">>,exchange,<<"fed_ex1">>},
fanout,true,false,false,[],none,
[{vhost,<<"/">>},
{name,<<"fed_pol1">>},
{pattern,<<"fed_.*">>},
{'apply-to',<<"all">>},
{definition,[{<<"federation-upstream">>,<<"fed_up1">>}]},
{priority,0}],
undefined,none,
#{user => <<"guest">>}},
and vhost-shovelname tuple for shovels:
3> [element(1, C) || C <- mirrored_supervisor:which_children(rabbit_shovel_dyn_worker_sup_sup)].
[{<<"/">>,<<"sh1">>}]
In khepri the key field on the #mirrored_sup_childspec{} record
has the new format but the childpsec field still has the old format:
6> rabbit_khepri:get_many("/:rabbit_db_msup/:mirrored_supervisor_childspec/:rabbit_federation_exchange_link_sup_sup/:exchange/**").
{ok,#{[rabbit_db_msup,mirrored_supervisor_childspec,
rabbit_federation_exchange_link_sup_sup,exchange,<<"/">>] =>
undefined,
[rabbit_db_msup,mirrored_supervisor_childspec,
rabbit_federation_exchange_link_sup_sup,exchange,<<"/">>,
<<"fed_ex1">>] =>
{mirrored_sup_childspec,{rabbit_federation_exchange_link_sup_sup,{[exchange,
<<"/">>,<<"fed_ex1">>], <---- new format
{exchange,{resource,<<"/">>,exchange,<<"fed_ex1">>},
fanout,true,false,false,[],none,
[{vhost,<<"/">>},
{name,<<"fed_pol1">>},
{pattern,<<"fed_.*">>},
{'apply-to',<<"all">>},
{definition,[{<<"federati"...>>,<<"fed_"...>>}]},
{priority,0}],
undefined,none,
#{user => <<"guest">>}}}},
<19214.622.0>,
{{exchange,{resource,<<"/">>,exchange,<<"fed_ex1">>}, <---- old format
fanout,true,false,false,[],none,
[{vhost,<<"/">>},
{name,<<"fed_pol1">>},
{pattern,<<"fed_.*">>},
{'apply-to',<<"all">>},
{definition,[{<<"federation-u"...>>,<<"fed_up1">>}]},
{priority,0}],
undefined,none,
#{user => <<"guest">>}},
{rabbit_federation_link_sup,start_link,
[{exchange,{resource,<<"/">>,exchange,<<"fed_ex1">>},
fanout,true,false,false,[],undefined,
[{vhost,<<"/">>},
{name,<<"fed_pol1">>},
{pattern,<<"fed_.*">>},
{'apply-to',<<"all">>},
{definition,[...]},
{priority,...}],
undefined,
{[],
[rabbit_event_exchange_decorator,
rabbit_federation_exchange]},
#{user => <<"guest">>}}]},
transient,infinity,supervisor,
[rabbit_federation_link_sup]}},
...
11> rabbit_khepri:get_many("/:rabbit_db_msup/:mirrored_supervisor_childspec/:rabbit_shovel_dyn_worker_sup_sup/**")
{ok,#{[rabbit_db_msup,mirrored_supervisor_childspec,
rabbit_shovel_dyn_worker_sup_sup,<<"/">>] =>
undefined,
[rabbit_db_msup,mirrored_supervisor_childspec,
rabbit_shovel_dyn_worker_sup_sup,<<"/">>,<<"sh1">>] =>
{mirrored_sup_childspec,{rabbit_shovel_dyn_worker_sup_sup,{[<<"/">>,
<<"sh1">>], <---- new format
{<<"/">>,<<"sh1">>}}},
<19214.1052.0>,
{{<<"/">>,<<"sh1">>}, <---- old format
{rabbit_shovel_dyn_worker_sup,start_link,
[{<<"/">>,<<"sh1">>},
[{<<"dest-uri">>,
[{encrypted,<<"kLryCy0IfCpKy3T83R5gGSJuuu5ObAszSYJO"...>>}]},
{<<"src-uri">>,
[{encrypted,<<"4B1M2vc+cnkeuaKnwHWnXfZxZTahAqLg"...>>}]},
{<<"ack-mode">>,<<"on-confirm">>},
{<<"dest-add-forward-headers">>,false},
{<<"dest-protocol">>,<<"amqp091">>},
{<<"dest-queue">>,<<"qq2">>},
{<<"src-delete-after">>,<<"never">>},
{<<"src-protocol">>,<<"amqp091">>},
{<<"src-queue">>,<<"qq1">>}]]},
transient,300000,worker,
[rabbit_shovel_dyn_worker_sup]}}}}
The result is that when such a shovel or federated exchange inherited from the old times is deleted, the associated worker processes are not.
In these cases mirrored_supervisor:termination_child returns not_found because
-
the new format is found in khepri but not known by the delegate supervisor
-
the old format is not even tried if khepri is enabled (but it would crash the khepri query)
-
shovel plugin's cleanup_spec also cannot delete these orphan workers (as visible from tracing)
<0.1055.0> mirrored_supervisor:delete_child(rabbit_shovel_dyn_worker_sup_sup, {<<"/">>,<<"sh1">>}) {rabbit_shovel_dyn_worker_sup_sup,'-cleanup_specs/0-fun-1-',2}
<0.1055.0> mirrored_supervisor:find_mirror(rabbit_shovel_dyn_worker_sup_sup, {<<"/">>,<<"sh1">>})
<0.1055.0> mirrored_supervisor:find_mirror/2 error function_clause
<0.1055.0> mirrored_supervisor:delete_child/2 error function_clause
Additionally when a node goes down and mirrored supervisor starts the workers on a new node, it takes the childspec from khepri which contains the old id format this way the child id will be preserved on the new supervisor state.
The impact is limited though, as if such a worker is restarted in Mnesia (on a version which supports both old and new format, ie 3.12.9+ for shovels, 3.13.0+ for federation) this will basically do a "full" conversion and no remnants of the old id remain.
One solution could be that instead of converting the records one by one during the migration, instead restart all workers before the migration starts (from the init_copy_to_khepri callback).
Reproduction steps
- Create a 3-node cluster with RabbitMQ 3.12.7
- Create a shovel and 2 federated exchanges by importing these definitions: definitions_shovel_and_federation_2024-01-26.json
- Do a rolling upgrade to 3.13.0-rc.4 + federation patch (PR #10416 https://github.com/rabbitmq/rabbitmq-server/pull/10416/commits/311cc925e3e1a425e2c0f9c0661663a19ba1f052) During the upgrade for each node I did "stop node N" + "sleep 5 seconds" + "start node N on new version" to make sure the mirrored supervisor starts the replacement workers before the shovel/federation plugin would start new workers based on parameters (in the later case they would ofc use new id format)
- Currently ids are in old format
2> [element(1, C) || C <- mirrored_supervisor:which_children(rabbit_federation_exchange_link_sup_sup)].
[{exchange,{resource,<<"/">>,exchange,<<"fed_ex1">>},
fanout,true,false,false,[],none,
[{vhost,<<"/">>},
{name,<<"fed_pol1">>},
{pattern,<<"fed_.*">>},
{'apply-to',<<"all">>},
{definition,[{<<"federation-upstream">>,<<"fed_up1">>}]},
{priority,0}],
undefined,none,
#{user => <<"guest">>}},
{exchange,{resource,<<"/">>,exchange,<<"fed_ex2">>},
fanout,true,false,false,[],none,
[{vhost,<<"/">>},
{name,<<"fed_pol1">>},
{pattern,<<"fed_.*">>},
{'apply-to',<<"all">>},
{definition,[{<<"federation-upstream">>,<<"fed_up1">>}]},
{priority,0}],
undefined,none,
#{user => <<"guest">>}}]
3> [element(1, C) || C <- mirrored_supervisor:which_children(rabbit_shovel_dyn_worker_sup_sup)].
[{<<"/">>,<<"sh1">>}]
4> [element(2, E) || E <- ets:tab2list(mirrored_sup_childspec)].
[{rabbit_federation_exchange_link_sup_sup,{exchange,{resource,<<"/">>,
exchange,<<"fed_ex1">>},
fanout,true,false,false,[],none,
[{vhost,<<"/">>},
{name,<<"fed_pol1">>},
{pattern,<<"fed_.*">>},
{'apply-to',<<"all">>},
{definition,[{<<"federation-upstream">>,<<"fed_up1">>}]},
{priority,0}],
undefined,none,
#{user => <<"guest">>}}},
{rabbit_federation_exchange_link_sup_sup,{exchange,{resource,<<"/">>,
exchange,<<"fed_ex2">>},
fanout,true,false,false,[],none,
[{vhost,<<"/">>},
{name,<<"fed_pol1">>},
{pattern,<<"fed_.*">>},
{'apply-to',<<"all">>},
{definition,[{<<"federation-upstream">>,<<"fed_up1">>}]},
{priority,0}],
undefined,none,
#{user => <<"guest">>}}},
{rabbit_shovel_dyn_worker_sup_sup,{<<"/">>,<<"sh1">>}}]
- Migrate to Khepri
% rabbitmqctl enable_feature_flag khepri_db --node rabbit-1
- Observe id format in supervisors' state and khepri:
4> [element(1, C) || C <- mirrored_supervisor:which_children(rabbit_federation_exchange_link_sup_sup)].
[{exchange,{resource,<<"/">>,exchange,<<"fed_ex1">>},
fanout,true,false,false,[],none,
[{vhost,<<"/">>},
{name,<<"fed_pol1">>},
{pattern,<<"fed_.*">>},
{'apply-to',<<"all">>},
{definition,[{<<"federation-upstream">>,<<"fed_up1">>}]},
{priority,0}],
undefined,none,
#{user => <<"guest">>}},
{exchange,{resource,<<"/">>,exchange,<<"fed_ex2">>},
fanout,true,false,false,[],none,
[{vhost,<<"/">>},
{name,<<"fed_pol1">>},
{pattern,<<"fed_.*">>},
{'apply-to',<<"all">>},
{definition,[{<<"federation-upstream">>,<<"fed_up1">>}]},
{priority,0}],
undefined,none,
#{user => <<"guest">>}}]
5> [element(1, C) || C <- mirrored_supervisor:which_children(rabbit_shovel_dyn_worker_sup_sup)].
[{<<"/">>,<<"sh1">>}]
6> rabbit_khepri:get_many("/:rabbit_db_msup/:mirrored_supervisor_childspec/:rabbit_federation_exchange_link_sup_sup/:exchange/**").
{ok,#{[rabbit_db_msup,mirrored_supervisor_childspec,
rabbit_federation_exchange_link_sup_sup,exchange,<<"/">>] =>
undefined,
[rabbit_db_msup,mirrored_supervisor_childspec,
rabbit_federation_exchange_link_sup_sup,exchange,<<"/">>,
<<"fed_ex1">>] =>
{mirrored_sup_childspec,{rabbit_federation_exchange_link_sup_sup,{[exchange,
<<"/">>,<<"fed_ex1">>],
{exchange,{resource,<<"/">>,exchange,<<"fed_ex1">>},
fanout,true,false,false,[],none,
[{vhost,<<"/">>},
{name,<<"fed_pol1">>},
{pattern,<<"fed_.*">>},
{'apply-to',<<"all">>},
{definition,[{<<"federati"...>>,<<"fed_"...>>}]},
{priority,0}],
undefined,none,
#{user => <<"guest">>}}}},
<19214.622.0>,
{{exchange,{resource,<<"/">>,exchange,<<"fed_ex1">>},
fanout,true,false,false,[],none,
[{vhost,<<"/">>},
{name,<<"fed_pol1">>},
{pattern,<<"fed_.*">>},
{'apply-to',<<"all">>},
{definition,[{<<"federation-u"...>>,<<"fed_up1">>}]},
{priority,0}],
undefined,none,
#{user => <<"guest">>}},
{rabbit_federation_link_sup,start_link,
[{exchange,{resource,<<"/">>,exchange,<<"fed_ex1">>},
fanout,true,false,false,[],undefined,
[{vhost,<<"/">>},
{name,<<"fed_pol1">>},
{pattern,<<"fed_.*">>},
{'apply-to',<<"all">>},
{definition,[...]},
{priority,...}],
undefined,
{[],
[rabbit_event_exchange_decorator,
rabbit_federation_exchange]},
#{user => <<"guest">>}}]},
transient,infinity,supervisor,
[rabbit_federation_link_sup]}},
[rabbit_db_msup,mirrored_supervisor_childspec,
rabbit_federation_exchange_link_sup_sup,exchange,<<"/">>,
<<"fed_ex2">>] =>
{mirrored_sup_childspec,{rabbit_federation_exchange_link_sup_sup,{[exchange,
<<"/">>,<<"fed_ex2">>],
{exchange,{resource,<<"/">>,exchange,<<"fed_ex2">>},
fanout,true,false,false,[],none,
[{vhost,<<"/">>},
{name,<<"fed_pol1">>},
{pattern,<<"fed_.*">>},
{'apply-to',<<"all">>},
{definition,[{<<"federati"...>>,<<"fed_"...>>}]},
{priority,0}],
undefined,none,
#{user => <<"guest">>}}}},
<19214.622.0>,
{{exchange,{resource,<<"/">>,exchange,<<"fed_ex2">>},
fanout,true,false,false,[],none,
[{vhost,<<"/">>},
{name,<<"fed_pol1">>},
{pattern,<<"fed_.*">>},
{'apply-to',<<"all">>},
{definition,[{<<"federation-u"...>>,<<"fed_up1">>}]},
{priority,0}],
undefined,none,
#{user => <<"guest">>}},
{rabbit_federation_link_sup,start_link,
[{exchange,{resource,<<"/">>,exchange,<<"fed_ex2">>},
fanout,true,false,false,[],undefined,
[{vhost,<<"/">>},
{name,<<"fed_pol1">>},
{pattern,<<"fed_.*">>},
{'apply-to',<<"all">>},
{definition,[...]},
{priority,...}],
undefined,
{[],
[rabbit_event_exchange_decorator,
rabbit_federation_exchange]},
#{user => <<"guest">>}}]},
transient,infinity,supervisor,
[rabbit_federation_link_sup]}}}}
11> rabbit_khepri:get_many("/:rabbit_db_msup/:mirrored_supervisor_childspec/:rabbit_shovel_dyn_worker_sup_sup/**")
{ok,#{[rabbit_db_msup,mirrored_supervisor_childspec,
rabbit_shovel_dyn_worker_sup_sup,<<"/">>] =>
undefined,
[rabbit_db_msup,mirrored_supervisor_childspec,
rabbit_shovel_dyn_worker_sup_sup,<<"/">>,<<"sh1">>] =>
{mirrored_sup_childspec,{rabbit_shovel_dyn_worker_sup_sup,{[<<"/">>,
<<"sh1">>],
{<<"/">>,<<"sh1">>}}},
<19214.1052.0>,
{{<<"/">>,<<"sh1">>},
{rabbit_shovel_dyn_worker_sup,start_link,
[{<<"/">>,<<"sh1">>},
[{<<"dest-uri">>,
[{encrypted,<<"kLryCy0IfCpKy3T83R5gGSJuuu5ObAszSYJO"...>>}]},
{<<"src-uri">>,
[{encrypted,<<"4B1M2vc+cnkeuaKnwHWnXfZxZTahAqLg"...>>}]},
{<<"ack-mode">>,<<"on-confirm">>},
{<<"dest-add-forward-headers">>,false},
{<<"dest-protocol">>,<<"amqp091">>},
{<<"dest-queue">>,<<"qq2">>},
{<<"src-delete-after">>,<<"never">>},
{<<"src-protocol">>,<<"amqp091">>},
{<<"src-queue">>,<<"qq1">>}]]},
transient,300000,worker,
[rabbit_shovel_dyn_worker_sup]}}}}
- Delete shovel "sh1" (meanwhile tracing in remote_shell of all 3 nodes)
20> recon_trace:calls([{mirrored_supervisor, find_mirror, fun(_) -> exception_trace() end},{mirrored_supervisor,call,fun([Pid, _]) when is_pid(Pid) -> exception_trace() end}], 50, [{scope,local}]).
2
2:8:25.784687 <0.8155.0> mirrored_supervisor:find_mirror(rabbit_shovel_dyn_worker_sup_sup, {[<<"/">>,<<"sh1">>],{<<"/">>,<<"sh1">>}})
2:8:25.785165 <0.8155.0> mirrored_supervisor:find_mirror/2 --> {ok,<0.1056.0>}
2:8:25.785444 <0.8155.0> mirrored_supervisor:call(<0.1056.0>, {msg,terminate_child,[{[<<"/">>,<<"sh1">>],{<<"/">>,<<"sh1">>}}]})
2:8:25.785751 <0.8155.0> mirrored_supervisor:call/2 --> {error,not_found}
2:8:41.591080 <0.1055.0> mirrored_supervisor:find_mirror(rabbit_shovel_dyn_worker_sup_sup, {<<"/">>,<<"sh1">>})
2:8:41.591514 <0.1055.0> mirrored_supervisor:find_mirror/2 error function_clause
25> recon_trace:calls({mirrored_supervisor, delete_child, fun([rabbit_shovel_dyn_worker_sup_sup, _]) -> exception_trace(), message(caller()) end}, 10).
1
2:18:41.611051 <0.1055.0> mirrored_supervisor:delete_child(rabbit_shovel_dyn_worker_sup_sup, {<<"/">>,<<"sh1">>}) {rabbit_shovel_dyn_worker_sup_sup,'-cleanup_specs/0-fun-1-',2}
2:18:41.611916 <0.1055.0> mirrored_supervisor:delete_child/2 error function_clause
- Delete exchange "fed_ex2" (meanwhile tracing in remote_shell of all 3 nodes)
4> recon_trace:calls([{mirrored_supervisor, find_mirror, fun(_) -> exception_trace() end},{mirrored_supervisor,call,fun([Pid, _]) when is
_pid(Pid) -> exception_trace() end}], 50, [{scope,local}]).
2
2:21:37.137719 <0.3173.0> mirrored_supervisor:find_mirror(rabbit_federation_exchange_link_sup_sup, {[exchange,<<"/">>,<<"fed_ex2">>],
{exchange,{resource,<<"/">>,exchange,<<"fed_ex2">>},
fanout,true,false,false,[],none,
[{vhost,<<"/">>},
{name,<<"fed_pol1">>},
{pattern,<<"fed_.*">>},
{'apply-to',<<"all">>},
{definition,[{<<"federation-upstream">>,<<"fed_up1">>}]},
{priority,0}],
undefined,none,
#{user => <<"guest">>}}})
2:21:37.138194 <0.3173.0> mirrored_supervisor:find_mirror/2 --> {ok,<27902.636.0>}
2:21:37.138528 <0.3173.0> mirrored_supervisor:call(<27902.636.0>, {msg,terminate_child,
[{[exchange,<<"/">>,<<"fed_ex2">>],
{exchange,{resource,<<"/">>,exchange,<<"fed_ex2">>},
fanout,true,false,false,[],none,
[{vhost,<<"/">>},
{name,<<"fed_pol1">>},
{pattern,<<"fed_.*">>},
{'apply-to',<<"all">>},
{definition,[{<<"federation-upstream">>,<<"fed_up1">>}]},
{priority,0}],
undefined,none,
#{user => <<"guest">>}}}]})
2:21:37.138865 <0.3173.0> mirrored_supervisor:call/2 --> {error,not_found}
Expected behavior
After migrating to Khepri, every component and process of mirrored_supervisors should use the new id format. When on 3.13.0 Khepri deleting a shovel or federated exchange created on or before 3.12.7, all associated processes should be cleaned up as well.
Additional context
No response
There is no way for us to address this completely for 3.13.0. Which is fine, Khepri is an experimental feature flag and Shovel/Federation are optional plugins that far from everyone uses.
We will try to get https://github.com/rabbitmq/rabbitmq-server/pull/10416 (https://github.com/rabbitmq/rabbitmq-server/issues/10306) in.
This is addressed by https://github.com/rabbitmq/rabbitmq-server/pull/10472