rabbitmq-server
rabbitmq-server copied to clipboard
Erlang client may error out due to basic.cancel race condition
https://www.rabbitmq.com/consumer-cancel.html
It is not an error for the client to issue a basic.cancel for a consumer which has been unexpectedly cancelled (e.g. due to queue deletion). By definition, there is a race possible between a client issuing a basic.cancel, and the broker sending out the asynchronous notification. In such cases, the broker does not error when it receives the basic.cancel and replies with a basic.cancel-ok as normal.
When this happens the Erlang client exits with an unexpected_delivery_and_no_default_consumer error. The scenario is as follow:
-
Server sends a basic.cancel, channel removes consumer from its state (
amqp_selective_consumerspecifically) -
Channel sends the basic.cancel message to user but user doesn't see it
-
User tells channel to basic.cancel, channel sends basic.cancel to server
-
Server sends back a basic.cancel_ok, channel doesn't know about the consumer anymore, can't forward message and exits with an error
For reference this is the CQv2 property suite manual test case that can reproduce the issue:
do_manual(Config) ->
St0 = #cq{name=prop_classic_queue_v2, mode=lazy, version=1,
config=minimal_config(Config)},
Res1 = cmd_setup_queue(St0),
St1 = St0#cq{amq=Res1},
% Res2 = cmd_set_version(St1, 2),
% true = postcondition(St1, {call, undefined, cmd_set_version, [St1, 2]}, Res2),
% St2 = next_state(St1, Res2, {call, undefined, cmd_set_version, [St1, 2]}),
St2 = St1,
% Res3 = cmd_channel_open(St2),
% true = postcondition(St2, {call, undefined, cmd_channel_open, [St2]}, Res3),
% St3 = next_state(St2, Res3, {call, undefined, cmd_channel_open, [St2]}),
St3 = St2,
% Res4 = cmd_restart_vhost_clean(St3),
% true = postcondition(St3, {call, undefined, cmd_restart_vhost_clean, [St3]}, Res4),
% St4 = next_state(St3, Res4, {call, undefined, cmd_restart_vhost_clean, [St3]}),
St4 = St3,
% Res5 = cmd_set_mode_version(lazy, 1),
% true = postcondition(St4, {call, undefined, cmd_set_mode_version, [lazy, 1]}, Res5),
% St5 = next_state(St4, Res5, {call, undefined, cmd_set_mode_version, [lazy, 1]}),
St5 = St4,
Res6 = cmd_channel_open(St5),
true = postcondition(St5, {call, undefined, cmd_channel_open, [St5]}, Res6),
St6 = next_state(St5, Res6, {call, undefined, cmd_channel_open, [St5]}),
Res7 = cmd_channel_confirm_mode(Res6),
true = postcondition(St6, {call, undefined, cmd_channel_confirm_mode, [Res6]}, Res7),
St7 = next_state(St6, Res7, {call, undefined, cmd_channel_confirm_mode, [Res6]}),
% Res8 = cmd_channel_close(Res3),
% true = postcondition(St7, {call, undefined, cmd_channel_close, [Res3]}, Res8),
% St8 = next_state(St7, Res8, {call, undefined, cmd_channel_close, [Res3]}),
St8 = St7,
Res9 = cmd_channel_consume(St8, Res6),
true = postcondition(St8, {call, undefined, cmd_channel_consume, [St8, Res6]}, Res9),
St9 = next_state(St8, Res9, {call, undefined, cmd_channel_consume, [St8, Res6]}),
Res10 = cmd_publish_msg(St9, 10, true, false, 73),
true = postcondition(St9, {call, undefined, cmd_publish_msg, [St9, 10, true, false, 73]}, Res10),
St10 = next_state(St9, Res10, {call, undefined, cmd_publish_msg, [St9, 10, true, false, 73]}),
% Res11 = cmd_set_version(St10, 1),
% true = postcondition(St10, {call, undefined, cmd_set_version, [St10, 1]}, Res11),
% St11 = next_state(St10, Res11, {call, undefined, cmd_set_version, [St10, 1]}),
St11 = St10,
Res12 = cmd_channel_open(St11),
true = postcondition(St11, {call, undefined, cmd_channel_open, [St11]}, Res12),
St12 = next_state(St11, Res12, {call, undefined, cmd_channel_open, [St11]}),
Res13 = cmd_channel_receive_and_ack(St12, Res6),
true = postcondition(St12, {call, undefined, cmd_channel_receive_and_ack, [St12, Res6]}, Res13),
St13 = next_state(St12, Res13, {call, undefined, cmd_channel_receive_and_ack, [St12, Res6]}),
Res14 = cmd_channel_publish(St13, Res12, 29, false, 58),
true = postcondition(St13, {call, undefined, cmd_channel_publish, [St13, Res12, 29, false, 58]}, Res14),
St14 = next_state(St13, Res14, {call, undefined, cmd_channel_publish, [St13, Res12, 29, false, 58]}),
Res15 = cmd_set_mode(St14, lazy),
true = postcondition(St14, {call, undefined, cmd_set_mode, [St14, lazy]}, Res15),
St15 = next_state(St14, Res15, {call, undefined, cmd_set_mode, [St14, lazy]}),
Res16 = cmd_channel_open(St15),
true = postcondition(St15, {call, undefined, cmd_channel_open, [St15]}, Res16),
St16 = next_state(St15, Res16, {call, undefined, cmd_channel_open, [St15]}),
Res17 = cmd_restart_queue_dirty(St16),
true = postcondition(St16, {call, undefined, cmd_restart_queue_dirty, [St16]}, Res17),
St17 = next_state(St16, Res17, {call, undefined, cmd_restart_queue_dirty, [St16]}),
Res18 = cmd_channel_open(St17),
true = postcondition(St17, {call, undefined, cmd_channel_open, [St17]}, Res18),
St18 = next_state(St17, Res18, {call, undefined, cmd_channel_open, [St17]}),
Res19 = cmd_restart_queue_dirty(St18),
true = postcondition(St18, {call, undefined, cmd_restart_queue_dirty, [St18]}, Res19),
St19 = next_state(St18, Res19, {call, undefined, cmd_restart_queue_dirty, [St18]}),
Res20 = cmd_channel_publish(St19, Res12, 34, true, 21),
true = postcondition(St19, {call, undefined, cmd_channel_publish, [St19, Res12, 34, true, 21]}, Res20),
St20 = next_state(St19, Res20, {call, undefined, cmd_channel_publish, [St19, Res12, 34, true, 21]}),
Res21 = cmd_channel_close(Res12),
true = postcondition(St20, {call, undefined, cmd_channel_close, [Res12]}, Res21),
St21 = next_state(St20, Res21, {call, undefined, cmd_channel_close, [Res12]}),
Res22 = cmd_set_mode(St21, lazy),
true = postcondition(St21, {call, undefined, cmd_set_mode, [St21, lazy]}, Res22),
St22 = next_state(St21, Res22, {call, undefined, cmd_set_mode, [St21, lazy]}),
Res23 = cmd_restart_queue_dirty(St22),
true = postcondition(St22, {call, undefined, cmd_restart_queue_dirty, [St22]}, Res23),
St23 = next_state(St22, Res23, {call, undefined, cmd_restart_queue_dirty, [St22]}),
% Res24 = cmd_channel_publish(St23, Res6, 12, true, undefined),
% true = postcondition(St23, {call, undefined, cmd_channel_publish, [St23, Res6, 12, true, undefined]}, Res24),
% St24 = next_state(St23, Res24, {call, undefined, cmd_channel_publish, [St23, Res6, 12, true, undefined]}),
St24 = St23,
Res25 = cmd_channel_cancel(St24, Res6),
true = postcondition(St24, {call, undefined, cmd_channel_cancel, [St24, Res6]}, Res25),
St25 = next_state(St24, Res25, {call, undefined, cmd_channel_cancel, [St24, Res6]}),
% Res26 = cmd_channel_basic_get(St25, Res18),
% true = postcondition(St25, {call, undefined, cmd_channel_basic_get, [St25, Res18]}, Res26),
% St26 = next_state(St25, Res26, {call, undefined, cmd_channel_basic_get, [St25, Res18]}),
%
% Res27 = cmd_restart_vhost_clean(St26),
% true = postcondition(St26, {call, undefined, cmd_restart_vhost_clean, [St26]}, Res27),
% St27 = next_state(St26, Res27, {call, undefined, cmd_restart_vhost_clean, [St26]}),
%
% Res28 = cmd_channel_wait_for_confirms(Res6),
% true = postcondition(St27, {call, undefined, cmd_channel_wait_for_confirms, [Res6]}, Res28),
% St28 = next_state(St27, Res28, {call, undefined, cmd_channel_wait_for_confirms, [Res6]}),
%
% Res29 = cmd_channel_publish(St28, Res16, 0, false, undefined),
% true = postcondition(St28, {call, undefined, cmd_channel_publish, [St28, Res16, 0, false, undefined]}, Res29),
% St29 = next_state(St28, Res29, {call, undefined, cmd_channel_publish, [St28, Res16, 0, false, undefined]}),
%
% Res30 = cmd_channel_consume(St29, Res6),
% true = postcondition(St29, {call, undefined, cmd_channel_consume, [St29, Res6]}, Res30),
% St30 = next_state(St29, Res30, {call, undefined, cmd_channel_consume, [St29, Res6]}),
%
% Res31 = cmd_channel_wait_for_confirms(Res6),
% true = postcondition(St30, {call, undefined, cmd_channel_wait_for_confirms, [Res6]}, Res31),
% St31 = next_state(St30, Res31, {call, undefined, cmd_channel_wait_for_confirms, [Res6]}),
%
% Res32 = cmd_basic_get_msg(St31),
% true = postcondition(St31, {call, undefined, cmd_basic_get_msg, [St31]}, Res32),
% St32 = next_state(St31, Res32, {call, undefined, cmd_basic_get_msg, [St31]}),
true.