rabbitmq-server icon indicating copy to clipboard operation
rabbitmq-server copied to clipboard

Quorum Queues with reject-publish max-length commit transactions successfully even if the messages are dropped

Open luos opened this issue 2 years ago • 2 comments

Describe the bug

Hi,

We were testing Quorum Queues with transactions, max-length, reject-publish. We've seen two unexpected behaviours. We think it is present on all versions.

Reproduction steps

  1. Create a Quorum Queue with the following arguments:
    • max-length = 5
    • x-overflow = reject-publish
  2. In a transaction, publish 3 messages and commit. These messages will be confirmed by RabbitMQ and accepted by the queue.
  3. In a transaction, publish 6 messages and commit. These messsages will be confirmed by RabbitMQ and accepted by the queue. The queue will have 9 messages, even though the limit is 5.
  4. In a transaction, publish another 6 messages. These messages will be confirmed by RabbitMQ, but not accepted by the queue. The messages will be silently dropped.

Expected behavior

  • For the second publish, transaction fails with channel exception "PRECONDITION_FAILED - partial tx completion".
  • For the third publish, transaction fails with channel exception "PRECONDITION_FAILED - partial tx completion".

Additional context

The probably cause of the second issue is the unconditional overwrite of the tx state to committing after handle_queue_actions:

https://github.com/rabbitmq/rabbitmq-server/blob/cee35f7cd2369f1063f421c704c74476d7792462/deps/rabbit/src/rabbit_channel.erl#L1717-L1719

Even though handle_queue_actions can set the tx state to be failed.

Reproduction as code:

{ok, Connection} =
    amqp_connection:start(#amqp_params_network{host = "localhost",
                                                username = "guest",
                                                password = "guest"}),
{ok, Channel} = amqp_connection:open_channel(Connection),
{ok, Connection, Channel},


% We are publishing to a queue with max-length = 5 and reject-publish

amqp_channel:call(Channel, #'queue.declare'{queue = <<"transaction-qq">>, durable = true, arguments = [
        {<<"x-queue-type">>, longstr, <<"quorum">>},
        {<<"x-max-length">>, long, 5},
        {<<"x-overflow">>, longstr, <<"reject-publish">>},
        {<<"x-delivery-limit">>, long, 100}
    ]}),

% These messages will fit into the max-length limit

#'tx.select_ok'{} = amqp_channel:call(Channel, #'tx.select'{}),
[amqp_channel:call(Chan,
                    #'basic.publish'{exchange = <<"">>, routing_key = <<"transaction-qq">>},
                    #amqp_msg{props = #'P_basic'{},
                                payload = list_to_binary(io_lib:format("this fits into the queue max-length ~p", [N]))})
    || N <- lists:seq(1, 3)],

amqp_channel:call(Chan, #'tx.commit'{}),

% queue length is now 3. We publish another 6 messages, which will overflow the queue and will have 9 messages (which is larger than max-length)

#'tx.select_ok'{} = amqp_channel:call(Channel, #'tx.select'{}),
[amqp_channel:call(Chan,
                    #'basic.publish'{exchange = <<"">>, routing_key = <<"transaction-qq">>},
                    #amqp_msg{props = #'P_basic'{},
                                payload = list_to_binary(io_lib:format("this does not but it will still go to the queue ~p", [N]))})
    || N <- lists:seq(1, 6)],

amqp_channel:call(Chan, #'tx.commit'{}),
% queue length is now 9.
ok,

% We publish these messages which will be confirmed by RabbitMQ
#'tx.select_ok'{} = amqp_channel:call(Channel, #'tx.select'{}),
[amqp_channel:call(Chan,
                    #'basic.publish'{exchange = <<"">>, routing_key = <<"transaction-qq">>},
                    #amqp_msg{props = #'P_basic'{},
                                payload = list_to_binary(io_lib:format("this will be lost but it will be confirmed ~p", [N]))})
    || N <- lists:seq(1, 6)],
% we receive tx.commit_ok
    #'tx.commit_ok'{} = amqp_channel:call(Chan, #'tx.commit'{}).

luos avatar Jul 27 '23 14:07 luos

Thanks for providing an executable way to reproduce and digging in.

The AMQP 0-9-1 spec does not cover additions such as x-overflow. When set to reject-publish explicitly, I agree with your analysis.

michaelklishin avatar Jul 27 '23 15:07 michaelklishin

reject-publish with quorum queues is documented to be subject to overflows. That said tx should handle case 2 better.

kjnilsson avatar Dec 04 '23 12:12 kjnilsson