aio-pika icon indicating copy to clipboard operation
aio-pika copied to clipboard

Impossible to handle Unexpected connection close

Open tayp1n opened this issue 1 year ago • 9 comments

How to handle Unexpected connection close and raise an exception?

I've got the logic below:

class AMQPHandler:
    def __init__(self) -> None:
        self.connection: AbstractRobustConnection | None = None
        self.channel: AbstractChannel | None = None

    async def init(self) -> None:
        import settings

        logger.info("Initializing AMQP handler")

        config = settings.CorePublisherSettings

        connection = await aio_pika.connect_robust(
            config.get_dsn(),
            loop=asyncio.get_event_loop(),
            timeout=config.CONNECTION_TIMEOUT,
        )
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)

        exchange = await channel.declare_exchange(
            config.EXCHANGE_NAME,
            config.EXCHANGE_TYPE,
            auto_delete=config.EXCHANGE_AUTO_DELETE,
            durable=True,
        )
        for key in config.BINDING_KEYS:
            q_name = (
                f"{key}.{config.PREFIX_BINDING_KEYS}"
                if config.PREFIX_BINDING_KEYS
                else key
            )
            queue = await channel.declare_queue(name=q_name)
            await queue.bind(exchange, q_name)
            await queue.consume(self.handle_message)
            logger.info("Queue declared", extra={"queue": q_name})

        self.connection = connection
        self.channel = channel

        logger.info("AMQP handler initialized")

Now, when Rabbit dropes, connection reconnects every 5 seconds. How to make it raise an exception which I could handle and stop a micro-service?

tayp1n avatar Nov 15 '23 13:11 tayp1n

You can use a regular connection instead of robust one.

Also event connection.connected can be checked in healthcheck handler

Alviner avatar Nov 15 '23 13:11 Alviner

What I also did is

 def on_connection_closed(self, *args, **kwargs):
        sys.exit(1)
    async def init(self) -> None:
        import settings

        logger.info("Initializing AMQP handler")

        config = settings.BaseMessageBrokerSettings

        conn = await aiormq.connect(
            config.get_dsn(),
            loop=asyncio.get_event_loop(),
        )
        channel = await conn.channel()

        conn.closing.add_done_callback(self.on_connection_closed)
        channel.closing.add_done_callback(self.on_connection_closed)

        await channel.exchange_declare(
            exchange=config.EXCHANGE_NAME,
            exchange_type=config.EXCHANGE_TYPE,
            auto_delete=config.EXCHANGE_AUTO_DELETE,
            durable=True,
        )
        for key in config.BINDING_KEYS:
            q_name = (
                f"{key}.{config.PREFIX_BINDING_KEYS}"
                if config.PREFIX_BINDING_KEYS
                else key
            )
            queue = await channel.queue_declare(queue=q_name, durable=True)
            self.queues.append(queue)

            await channel.queue_bind(exchange=config.EXCHANGE_NAME, queue=q_name)
            await channel.basic_consume(queue.queue, self.handle_message, no_ack=True)
            logger.info("Queue declared", extra={"queue": q_name})

        self.conn = conn
        self.channel = channel

        logger.info("AMQP handler initialized")
       

tayp1n avatar Nov 15 '23 16:11 tayp1n

Your original code uses connect_robust(), i.e. the function used to create a connection that automatically reconnects... Sounds like you don't want to use that.

Dreamsorcerer avatar Nov 20 '23 10:11 Dreamsorcerer

I have the same issue. With connect_robust if the connection to RabbitMQ is lost, the consumer stops getting messages after re-established.

The logs show the consumer reconnecting every 5secs, Rabbit becomes reachable again, the logs stop and no new messages show up. In the UI RabbitMQ has 0 consumer for that queue.

I'm using the exact code from here (Master/Worker) but with a dict instead of task_id: https://aio-pika.readthedocs.io/en/latest/patterns.html

@mosquito Any idea?

I'm using the aio-pika 9.4.0 with Python 3.10.

gaby avatar Mar 23 '24 03:03 gaby

This might be fixed by #622 in 9.4.1. It sounds similar enough to what I encountered debugging #615.

Darsstar avatar Mar 26 '24 10:03 Darsstar

We're currently using aio_pika-9.4.2 and encountered the same issue with connect_robust in our production env, as @gaby. Once RabbitMQ becomes reachable again, the client is stuck on await self.__connection_close_event.wait() .

yaelmi3 avatar Jul 11 '24 14:07 yaelmi3

@yaelmi3 I fixed my issue using this https://github.com/mosquito/aio-pika/issues/231#issuecomment-2023072289

gaby avatar Jul 11 '24 14:07 gaby

@yaelmi3 I have the same issue, connection stuck after RabbitMQ becames available again.

I had code like

self.connection = await aio_pika.connect_robust(config.RABBIT_URI)

and then declaring a queue and etc.

I added timeout argument to connect_robust function, and it works!

self.connection = await aio_pika.connect_robust(config.RABBIT_URI, timeout=5)

Now after rabbit becames alive I have TimeourError (seems that reconnect task stucks, if rabbit becomes alive during the reconnection process). And the next reconnection try has a success.

RomaLash avatar Jul 12 '24 16:07 RomaLash

@RomaLash : Actually I did try adding timeout to the connect_robust func, but this did not resolve my issue, in case of Unexpected connection close from remote. I assume this is because the the client is stuck on Waiting for connection close event, in case of abrupt server shutdown and then it being started again.

My solution was changing the code in the following manner, inspired by this from @gaby

Before:

        connection = await self._get_connection()
        queue = await self._get_queue(connection=connection, queue_name=queue_name,
                                      qos_prefetch_count=qos_prefetch_count, set_qos=set_qos)
        await queue.consume(partial(self._handle_message, message_handler, error_handler, fail_strategy))
        future = Future()
        future.add_done_callback(lambda _: connection.close())
        await future

After:

        connection = await self._get_connection()
        queue = await self._get_queue(connection=connection, queue_name=queue_name,
                                      qos_prefetch_count=qos_prefetch_count, set_qos=set_qos)
        await queue.consume(partial(self._handle_message, message_handler, error_handler, fail_strategy))
        try:
            await Future()
        finally:
            await queue.close()
            await connection.close()
  • in both cases, the connect_robust remained the same
    async def _get_connection(self):
        return await aio_pika.connect_robust(
            login=self.username,
            password=self.password,
            host=self.host,
            port=self.port,
            virtualhost=self.vhost,
            timeout=self.timeout
        )

yaelmi3 avatar Jul 14 '24 09:07 yaelmi3