rabbitmq-server icon indicating copy to clipboard operation
rabbitmq-server copied to clipboard

BasicGet and then BasicPublish can cause INTERNAL_ERROR from the broker

Open bording opened this issue 2 years ago • 11 comments

While working on adding quorum queue support to the NServiceBus RabbitMQ transport, we've come across a strange problem that causes an INTERNAL_ERROR to be thrown.

The scenario seems to be the following:

  1. Have a classic queue and publish a message to it
  2. Use a different channel to BasicGet the message
  3. Delete the queue and recreate it as a quorum queue
  4. Try to publish a message to the queue using the same channel that the BasicGet was done from

Here's a repro that demonstrates the problem:

using RabbitMQ.Client;

var factory = new ConnectionFactory();

var arguments = new Dictionary<string, object> { { "x-queue-type", "quorum" } };
using var connection = factory.CreateConnection();

using var channel = connection.CreateModel();
channel.ConfirmSelect();

//replace all secondChannel usage with channel and the code will work properly
using var secondChannel = connection.CreateModel();
secondChannel.ConfirmSelect();

channel.QueueDeclare("main", true, false, false); //create classic queue
channel.BasicPublish(string.Empty, "main", channel.CreateBasicProperties(), ReadOnlyMemory<byte>.Empty);
channel.WaitForConfirmsOrDie();

var message = secondChannel.BasicGet("main", false);
secondChannel.BasicAck(message.DeliveryTag, false);

channel.QueueDelete("main");
channel.QueueDeclare("main", true, false, false, arguments); //create quorum queue

secondChannel.BasicPublish(string.Empty, "main", secondChannel.CreateBasicProperties(), ReadOnlyMemory<byte>.Empty);
secondChannel.WaitForConfirmsOrDie(); //throws AlreadyClosedException here

channel.QueueDelete("main");

This code throws the following exception:

RabbitMQ.Client.Exceptions.AlreadyClosedException
  HResult=0x80131500
  Message=Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=541, text='INTERNAL_ERROR', classId=0, methodId=0
  Source=RabbitMQ.Client
  StackTrace:
   at RabbitMQ.Client.Impl.ModelBase.WaitForConfirms(TimeSpan timeout, Boolean& timedOut)
   at RabbitMQ.Client.Impl.ModelBase.WaitForConfirmsOrDie(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.WaitForConfirmsOrDie()
   at RabbitMQ.Client.Impl.AutorecoveringModel.WaitForConfirmsOrDie()
   at Program.<Main>$(String[] args) in C:\Code\RabbitChannelBugRepro\Program.cs:line 28

The repro code is located at https://github.com/andreasohlund/RabbitChannelBugRepro if you want to try it out for yourself.

The repository also includes packet captures of the repro code running, both as-is (broken.pcapng) and with only a single channel used (works.pcapng). There is a also a server log file of when the code is run, showing the error that is logged.

Looking at the packet captures, nothing stood out to me as obviously different/wrong, so I suppose this could be a broker problem and not a client problem, but I'm not sure, so I opened the issue here first.

bording avatar Jun 03 '22 17:06 bording

Publishing and consuming from a queue that's being redeclared using a different type is very likely to introduce a race condition between things that never coordinate. I don't think it can be universally solved.

What's in the server logs?

michaelklishin avatar Jun 03 '22 18:06 michaelklishin

Publishing and consuming from a queue that's being redeclared using a different type is very likely to introduce a race condition between things that never coordinate. I don't think it can be universally solved.

It just seems strange to me that it seems to work reliably when a single channel is used for everything, but moving a couple key pieces to a different channel reliably breaks it.

What's in the server logs?

The errors start at https://github.com/andreasohlund/RabbitChannelBugRepro/blob/main/captures/rabbit%40RabbitMQ.log#L160

bording avatar Jun 03 '22 19:06 bording

Thanks @bording, I'll find time to check this out. Have a good weekend.

lukebakken avatar Jun 03 '22 20:06 lukebakken

@kjnilsson the value for module here is for the old classic queue when the new quorum one is created and then published to on secondChannel:

https://github.com/rabbitmq/rabbitmq-server/blob/master/deps/rabbit/src/rabbit_queue_type.erl#L573-L575

lukebakken avatar Jun 05 '22 14:06 lukebakken

Looks like the second channel isn't detecting the deletion of the classic queue, either quickly enough or there is a bug causing it not to detect it at all. This would always be susceptible to race conditions as the end of the queue detection is done async. Channels are stateful w.r.t to their queues and we don't scan all channels in a RabbitMq cluster on delete.

kjnilsson avatar Jun 06 '22 12:06 kjnilsson

Looks like the second channel isn't detecting the deletion of the classic queue, either quickly enough or there is a bug causing it not to detect it at all. This would always be susceptible to race conditions as the end of the queue detection is done async. Channels are stateful w.r.t to their queues and we don't scan all channels in a RabbitMq cluster on delete.

@kjnilsson The same problem occurs if the delete and re-declare happens on secondChannel instead of channel.

If I'm understanding what you're saying here correctly, shouldn't that work properly at that point since the get, delete, declare and publish are all happening on the same channel?

bording avatar Jun 07 '22 21:06 bording

The fundamental issue is that channels and the queue leader do not coordinate, they are all independent entities. So if you concurrently delete the queue and re-declare it with a different set of arguments, it can trip up an ongoing operation on another channel.

This is rare to see outside of tests because independent clients tend to use independent (uniquely named) queues and shared queues are basically never deleted.

Making channels coordinate will severely affect their throughput and concurrency rate. But maybe there are certain scenarios we can cover.

michaelklishin avatar Jun 08 '22 08:06 michaelklishin

So if you concurrently delete the queue and re-declare it with a different set of arguments, it can trip up an ongoing operation on another channel.

How does that explain that I'm still seeing the problem when the delete, re-declare and usage are all happening on the same channel?

EDIT: For clarity, here is the updated repro code:

using RabbitMQ.Client;

var factory = new ConnectionFactory();

var arguments = new Dictionary<string, object> { { "x-queue-type", "quorum" } };
using var connection = factory.CreateConnection();

using var channel = connection.CreateModel();
channel.ConfirmSelect();

channel.QueueDeclare("main", true, false, false); //create classic queue
channel.BasicPublish(string.Empty, "main", channel.CreateBasicProperties(), ReadOnlyMemory<byte>.Empty);
channel.WaitForConfirmsOrDie();

using var secondChannel = connection.CreateModel();
secondChannel.ConfirmSelect();

var message = secondChannel.BasicGet("main", false);
secondChannel.BasicAck(message.DeliveryTag, false);

secondChannel.QueueDelete("main");
secondChannel.QueueDeclare("main", true, false, false, arguments); //create quorum queue

secondChannel.BasicPublish(string.Empty, "main", secondChannel.CreateBasicProperties(), ReadOnlyMemory<byte>.Empty);
secondChannel.WaitForConfirmsOrDie(); //still throws here

secondChannel.QueueDelete("main");

If you comment out the BasicGet done on secondChannel right before the delete, then no exception occurs.

I'm not seeing how this has anything to do with concurrent operations across channels.

bording avatar Jun 08 '22 15:06 bording

Channels do not wait for queues to "apply" their operations. Maybe in some cases they could.

michaelklishin avatar Jun 08 '22 16:06 michaelklishin

Channels do not wait for queues to "apply" their operations. Maybe in some cases they could.

I'm sorry if I'm being dense, but I still don't understand how this comment explains the behavior I'm seeing.

With the updated repro code, it 100% of the time will throw with the INTERNAL_ERROR from the broker. If I comment out the BasicGet then it works fine 100% of the time.

How is adding a BasicGet before a delete causing a consistent problem explained by "Channels do not wait for queues to 'apply' their operations" ?

bording avatar Jun 08 '22 19:06 bording

@bording I'll do some tests to confirm, but my theory is that, without the BasicGet, the second channel has no knowledge of what kind of queue it's dealing with. Deleting a queue must not update the channel state in the same way that a BasicGet does.

The bug is that channels (queue type data) don't seem to update their internal state to catch cases where the type of the queue with which they are associated changes.

lukebakken avatar Jun 08 '22 23:06 lukebakken

@lukebakken Did you ever get a chance to follow up on this?

bording avatar Oct 25 '22 14:10 bording

We know what the issue is (see my previous comment) but haven't had time to figure out any solution, other than don't change the queue type when active channels are using the queue.

lukebakken avatar Oct 25 '22 18:10 lukebakken

@bording I've taken this one up again. It's a head-scratcher 🤔

lukebakken avatar Apr 13 '23 22:04 lukebakken

@bording FYI fix is in #7943. Thanks for reporting this with code to reproduce, and your patience!

lukebakken avatar Apr 19 '23 21:04 lukebakken

Unlike with 3.12.x, 3.11.x does not provide access to queue name in rabbit_classic_queue:dequeue/3, so the fix will be 3.12-specific, at least in its current form.

michaelklishin avatar Apr 20 '23 09:04 michaelklishin