aio-pika icon indicating copy to clipboard operation
aio-pika copied to clipboard

Publish hangs when connection is closed

Open krieghan opened this issue 1 year ago • 5 comments

I'm doing an upgrade from 6.6.0 to 9.1.3 (yes, I'm afraid it's been awhile). I noted a test that I wrote ages ago for the case when connections close unexpectedly

  async def test_close_publish_connection(self):
      connection = await get_connection()
      channel = await connection.channel()
      await connection.close()
      with self.assertRaises(aiormq.exceptions.ChannelInvalidStateError):
              await channel.default_exchange.publish(
                  aio_pika.Message(
                      body=b'',
                      headers={}),
                  routing_key='test_queue')

"publish" used to raise ChannelInvalidStateError. Now, the call to publish seems to hang somewhere in here (aio_pika/robust_channel.py):

79 async def get_underlay_channel(self) -> aiormq.abc.AbstractChannel: 80 -> await self._connection.ready() 81 return await super().get_underlay_channel()

So it's waiting on connection to be ready, but it never will be. This is kind of a bizarre test to have in my application - I can't quite remember why I added it - but I am kind of concerned about the behavior of publish calls when the underlying connection closes.

krieghan avatar Jun 22 '23 16:06 krieghan

the test is really strange, I think to support throwing an exception for this case is possible on the aio-pika side, but so far I am still confused by your test ))))

mosquito avatar Jun 22 '23 16:06 mosquito

I think the main case that I'm concerned with in this test is you get a channel, the channel's underlying connection is closed, and then we try to publish to the channel. This seems like something that could happen? In 6.6.0 it threw an exception, but now it waits forever for the connection to become available again.

krieghan avatar Jun 22 '23 19:06 krieghan

The way it works now is that there is a RobustConnection that contains a transport, which is a separable and independent entity that can be recreated if the real connection to the broker will be closed. The transport contains an underlay connection, which is the real connection, and exactly this connection that will be closed in the event of a network failure or whatewer.

So the .close method in your example is not closing an underlay connection, it's closing the robust connection instance.

If you want to close the underlay connection you have to get connection.transport.connection and close this.

I understand your confusion, the library has changed internally several times since 6.x.x.

Anyway you are right, we have to clearly signal the channels to close, it looks like a bug, but I don't understand its severity yet.

To be honest, I was pretty sure that a similar test was in the aio-pika test suite, but I couldn't find it quickly, I'll have to write it.

mosquito avatar Jun 22 '23 20:06 mosquito

Thanks for the explanation, and the quick response

krieghan avatar Jun 22 '23 20:06 krieghan

I think it may cause denial of service if direct publishing is used. Imagine you have 100 rps app and RMQ server is down for 1 min, your app will hang, the event loop will be overflown by countless amount of futures and the app probably will be killed by OOM-killer.

I would like to have an adjustable publishing behavior which throws exceptions if connection is closed for n seconds

zoola969 avatar Jan 08 '24 15:01 zoola969