rabbitmq-server icon indicating copy to clipboard operation
rabbitmq-server copied to clipboard

Erlang client may error out due to basic.cancel race condition

Open lhoguin opened this issue 3 years ago • 1 comments

https://www.rabbitmq.com/consumer-cancel.html

It is not an error for the client to issue a basic.cancel for a consumer which has been unexpectedly cancelled (e.g. due to queue deletion). By definition, there is a race possible between a client issuing a basic.cancel, and the broker sending out the asynchronous notification. In such cases, the broker does not error when it receives the basic.cancel and replies with a basic.cancel-ok as normal.

When this happens the Erlang client exits with an unexpected_delivery_and_no_default_consumer error. The scenario is as follow:

  • Server sends a basic.cancel, channel removes consumer from its state (amqp_selective_consumer specifically)

  • Channel sends the basic.cancel message to user but user doesn't see it

  • User tells channel to basic.cancel, channel sends basic.cancel to server

  • Server sends back a basic.cancel_ok, channel doesn't know about the consumer anymore, can't forward message and exits with an error

lhoguin avatar Feb 01 '22 09:02 lhoguin

For reference this is the CQv2 property suite manual test case that can reproduce the issue:

do_manual(Config) ->

    St0 = #cq{name=prop_classic_queue_v2, mode=lazy, version=1,
              config=minimal_config(Config)},

    Res1 = cmd_setup_queue(St0),
    St1 = St0#cq{amq=Res1},

%    Res2 = cmd_set_version(St1, 2),
%    true = postcondition(St1, {call, undefined, cmd_set_version, [St1, 2]}, Res2),
%    St2 = next_state(St1, Res2, {call, undefined, cmd_set_version, [St1, 2]}),
    St2 = St1,

%    Res3 = cmd_channel_open(St2),
%    true = postcondition(St2, {call, undefined, cmd_channel_open, [St2]}, Res3),
%    St3 = next_state(St2, Res3, {call, undefined, cmd_channel_open, [St2]}),
    St3 = St2,

%    Res4 = cmd_restart_vhost_clean(St3),
%    true = postcondition(St3, {call, undefined, cmd_restart_vhost_clean, [St3]}, Res4),
%    St4 = next_state(St3, Res4, {call, undefined, cmd_restart_vhost_clean, [St3]}),
    St4 = St3,

%    Res5 = cmd_set_mode_version(lazy, 1),
%    true = postcondition(St4, {call, undefined, cmd_set_mode_version, [lazy, 1]}, Res5),
%    St5 = next_state(St4, Res5, {call, undefined, cmd_set_mode_version, [lazy, 1]}),
    St5 = St4,

    Res6 = cmd_channel_open(St5),
    true = postcondition(St5, {call, undefined, cmd_channel_open, [St5]}, Res6),
    St6 = next_state(St5, Res6, {call, undefined, cmd_channel_open, [St5]}),

    Res7 = cmd_channel_confirm_mode(Res6),
    true = postcondition(St6, {call, undefined, cmd_channel_confirm_mode, [Res6]}, Res7),
    St7 = next_state(St6, Res7, {call, undefined, cmd_channel_confirm_mode, [Res6]}),

%    Res8 = cmd_channel_close(Res3),
%    true = postcondition(St7, {call, undefined, cmd_channel_close, [Res3]}, Res8),
%    St8 = next_state(St7, Res8, {call, undefined, cmd_channel_close, [Res3]}),
    St8 = St7,

    Res9 = cmd_channel_consume(St8, Res6),
    true = postcondition(St8, {call, undefined, cmd_channel_consume, [St8, Res6]}, Res9),
    St9 = next_state(St8, Res9, {call, undefined, cmd_channel_consume, [St8, Res6]}),

    Res10 = cmd_publish_msg(St9, 10, true, false, 73),
    true = postcondition(St9, {call, undefined, cmd_publish_msg, [St9, 10, true, false, 73]}, Res10),
    St10 = next_state(St9, Res10, {call, undefined, cmd_publish_msg, [St9, 10, true, false, 73]}),

%    Res11 = cmd_set_version(St10, 1),
%    true = postcondition(St10, {call, undefined, cmd_set_version, [St10, 1]}, Res11),
%    St11 = next_state(St10, Res11, {call, undefined, cmd_set_version, [St10, 1]}),
    St11 = St10,

    Res12 = cmd_channel_open(St11),
    true = postcondition(St11, {call, undefined, cmd_channel_open, [St11]}, Res12),
    St12 = next_state(St11, Res12, {call, undefined, cmd_channel_open, [St11]}),

    Res13 = cmd_channel_receive_and_ack(St12, Res6),
    true = postcondition(St12, {call, undefined, cmd_channel_receive_and_ack, [St12, Res6]}, Res13),
    St13 = next_state(St12, Res13, {call, undefined, cmd_channel_receive_and_ack, [St12, Res6]}),

    Res14 = cmd_channel_publish(St13, Res12, 29, false, 58),
    true = postcondition(St13, {call, undefined, cmd_channel_publish, [St13, Res12, 29, false, 58]}, Res14),
    St14 = next_state(St13, Res14, {call, undefined, cmd_channel_publish, [St13, Res12, 29, false, 58]}),

    Res15 = cmd_set_mode(St14, lazy),
    true = postcondition(St14, {call, undefined, cmd_set_mode, [St14, lazy]}, Res15),
    St15 = next_state(St14, Res15, {call, undefined, cmd_set_mode, [St14, lazy]}),

    Res16 = cmd_channel_open(St15),
    true = postcondition(St15, {call, undefined, cmd_channel_open, [St15]}, Res16),
    St16 = next_state(St15, Res16, {call, undefined, cmd_channel_open, [St15]}),

    Res17 = cmd_restart_queue_dirty(St16),
    true = postcondition(St16, {call, undefined, cmd_restart_queue_dirty, [St16]}, Res17),
    St17 = next_state(St16, Res17, {call, undefined, cmd_restart_queue_dirty, [St16]}),

    Res18 = cmd_channel_open(St17),
    true = postcondition(St17, {call, undefined, cmd_channel_open, [St17]}, Res18),
    St18 = next_state(St17, Res18, {call, undefined, cmd_channel_open, [St17]}),

    Res19 = cmd_restart_queue_dirty(St18),
    true = postcondition(St18, {call, undefined, cmd_restart_queue_dirty, [St18]}, Res19),
    St19 = next_state(St18, Res19, {call, undefined, cmd_restart_queue_dirty, [St18]}),

    Res20 = cmd_channel_publish(St19, Res12, 34, true, 21),
    true = postcondition(St19, {call, undefined, cmd_channel_publish, [St19, Res12, 34, true, 21]}, Res20),
    St20 = next_state(St19, Res20, {call, undefined, cmd_channel_publish, [St19, Res12, 34, true, 21]}),

    Res21 = cmd_channel_close(Res12),
    true = postcondition(St20, {call, undefined, cmd_channel_close, [Res12]}, Res21),
    St21 = next_state(St20, Res21, {call, undefined, cmd_channel_close, [Res12]}),

    Res22 = cmd_set_mode(St21, lazy),
    true = postcondition(St21, {call, undefined, cmd_set_mode, [St21, lazy]}, Res22),
    St22 = next_state(St21, Res22, {call, undefined, cmd_set_mode, [St21, lazy]}),

    Res23 = cmd_restart_queue_dirty(St22),
    true = postcondition(St22, {call, undefined, cmd_restart_queue_dirty, [St22]}, Res23),
    St23 = next_state(St22, Res23, {call, undefined, cmd_restart_queue_dirty, [St22]}),

%    Res24 = cmd_channel_publish(St23, Res6, 12, true, undefined),
%    true = postcondition(St23, {call, undefined, cmd_channel_publish, [St23, Res6, 12, true, undefined]}, Res24),
%    St24 = next_state(St23, Res24, {call, undefined, cmd_channel_publish, [St23, Res6, 12, true, undefined]}),
    St24 = St23,

    Res25 = cmd_channel_cancel(St24, Res6),
    true = postcondition(St24, {call, undefined, cmd_channel_cancel, [St24, Res6]}, Res25),
    St25 = next_state(St24, Res25, {call, undefined, cmd_channel_cancel, [St24, Res6]}),

%    Res26 = cmd_channel_basic_get(St25, Res18),
%    true = postcondition(St25, {call, undefined, cmd_channel_basic_get, [St25, Res18]}, Res26),
%    St26 = next_state(St25, Res26, {call, undefined, cmd_channel_basic_get, [St25, Res18]}),
%
%    Res27 = cmd_restart_vhost_clean(St26),
%    true = postcondition(St26, {call, undefined, cmd_restart_vhost_clean, [St26]}, Res27),
%    St27 = next_state(St26, Res27, {call, undefined, cmd_restart_vhost_clean, [St26]}),
%
%    Res28 = cmd_channel_wait_for_confirms(Res6),
%    true = postcondition(St27, {call, undefined, cmd_channel_wait_for_confirms, [Res6]}, Res28),
%    St28 = next_state(St27, Res28, {call, undefined, cmd_channel_wait_for_confirms, [Res6]}),
%
%    Res29 = cmd_channel_publish(St28, Res16, 0, false, undefined),
%    true = postcondition(St28, {call, undefined, cmd_channel_publish, [St28, Res16, 0, false, undefined]}, Res29),
%    St29 = next_state(St28, Res29, {call, undefined, cmd_channel_publish, [St28, Res16, 0, false, undefined]}),
%
%    Res30 = cmd_channel_consume(St29, Res6),
%    true = postcondition(St29, {call, undefined, cmd_channel_consume, [St29, Res6]}, Res30),
%    St30 = next_state(St29, Res30, {call, undefined, cmd_channel_consume, [St29, Res6]}),
%
%    Res31 = cmd_channel_wait_for_confirms(Res6),
%    true = postcondition(St30, {call, undefined, cmd_channel_wait_for_confirms, [Res6]}, Res31),
%    St31 = next_state(St30, Res31, {call, undefined, cmd_channel_wait_for_confirms, [Res6]}),
%
%    Res32 = cmd_basic_get_msg(St31),
%    true = postcondition(St31, {call, undefined, cmd_basic_get_msg, [St31]}, Res32),
%    St32 = next_state(St31, Res32, {call, undefined, cmd_basic_get_msg, [St31]}),

    true.

lhoguin avatar Feb 01 '22 09:02 lhoguin