rabbitmq-server
rabbitmq-server copied to clipboard
Quorum Queues with reject-publish max-length commit transactions successfully even if the messages are dropped
Describe the bug
Hi,
We were testing Quorum Queues with transactions, max-length, reject-publish. We've seen two unexpected behaviours. We think it is present on all versions.
Reproduction steps
- Create a Quorum Queue with the following arguments:
max-length = 5x-overflow = reject-publish
- In a transaction, publish 3 messages and commit. These messages will be confirmed by RabbitMQ and accepted by the queue.
- In a transaction, publish 6 messages and commit. These messsages will be confirmed by RabbitMQ and accepted by the queue. The queue will have 9 messages, even though the limit is
5. - In a transaction, publish another 6 messages. These messages will be confirmed by RabbitMQ, but not accepted by the queue. The messages will be silently dropped.
Expected behavior
- For the second publish, transaction fails with channel exception "PRECONDITION_FAILED - partial tx completion".
- For the third publish, transaction fails with channel exception "PRECONDITION_FAILED - partial tx completion".
Additional context
The probably cause of the second issue is the unconditional overwrite of the tx state to committing after handle_queue_actions:
https://github.com/rabbitmq/rabbitmq-server/blob/cee35f7cd2369f1063f421c704c74476d7792462/deps/rabbit/src/rabbit_channel.erl#L1717-L1719
Even though handle_queue_actions can set the tx state to be failed.
Reproduction as code:
{ok, Connection} =
amqp_connection:start(#amqp_params_network{host = "localhost",
username = "guest",
password = "guest"}),
{ok, Channel} = amqp_connection:open_channel(Connection),
{ok, Connection, Channel},
% We are publishing to a queue with max-length = 5 and reject-publish
amqp_channel:call(Channel, #'queue.declare'{queue = <<"transaction-qq">>, durable = true, arguments = [
{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-max-length">>, long, 5},
{<<"x-overflow">>, longstr, <<"reject-publish">>},
{<<"x-delivery-limit">>, long, 100}
]}),
% These messages will fit into the max-length limit
#'tx.select_ok'{} = amqp_channel:call(Channel, #'tx.select'{}),
[amqp_channel:call(Chan,
#'basic.publish'{exchange = <<"">>, routing_key = <<"transaction-qq">>},
#amqp_msg{props = #'P_basic'{},
payload = list_to_binary(io_lib:format("this fits into the queue max-length ~p", [N]))})
|| N <- lists:seq(1, 3)],
amqp_channel:call(Chan, #'tx.commit'{}),
% queue length is now 3. We publish another 6 messages, which will overflow the queue and will have 9 messages (which is larger than max-length)
#'tx.select_ok'{} = amqp_channel:call(Channel, #'tx.select'{}),
[amqp_channel:call(Chan,
#'basic.publish'{exchange = <<"">>, routing_key = <<"transaction-qq">>},
#amqp_msg{props = #'P_basic'{},
payload = list_to_binary(io_lib:format("this does not but it will still go to the queue ~p", [N]))})
|| N <- lists:seq(1, 6)],
amqp_channel:call(Chan, #'tx.commit'{}),
% queue length is now 9.
ok,
% We publish these messages which will be confirmed by RabbitMQ
#'tx.select_ok'{} = amqp_channel:call(Channel, #'tx.select'{}),
[amqp_channel:call(Chan,
#'basic.publish'{exchange = <<"">>, routing_key = <<"transaction-qq">>},
#amqp_msg{props = #'P_basic'{},
payload = list_to_binary(io_lib:format("this will be lost but it will be confirmed ~p", [N]))})
|| N <- lists:seq(1, 6)],
% we receive tx.commit_ok
#'tx.commit_ok'{} = amqp_channel:call(Chan, #'tx.commit'{}).
Thanks for providing an executable way to reproduce and digging in.
The AMQP 0-9-1 spec does not cover additions such as x-overflow. When set to reject-publish explicitly, I agree with your analysis.
reject-publish with quorum queues is documented to be subject to overflows. That said tx should handle case 2 better.