aiormq icon indicating copy to clipboard operation
aiormq copied to clipboard

No way to know if a consumer has been cancelled server-side

Open abompard opened this issue 11 months ago • 2 comments

Hi! I'm using aio-pika and I'd like my consumer to stop and restart if it has been cancelled server-side, for exemple if the queue has been deleted on the server. If I understand correctly, this is not currently possible because aiormq will silently drop the consumer when that happens: https://github.com/mosquito/aiormq/blob/eb1ebe5d1381f551a10cb99d07a95395e59c7cc3/aiormq/channel.py#L392

Is there another way to do it? Could aiormq cancel the task/future that the consumer is linked to? Thanks.

abompard avatar Sep 25 '23 09:09 abompard

@mosquito Hi, it seems like we have encountered the same problem in our applications. Sometimes, our RabbitMQ cluster with 3 nodes sends a CancelOK message, which indicates that a RabbitMQ replica node has moved to another machine in Kubernetes. As a result, we lose consumers and our application stops consuming queues. In order to resolve this issue, we have to manually restart the application.

Here a simple test for emulating problem

import asyncio
import os
import uuid

import aiormq
import pytest
from aiormq import Connection
from aiormq.abc import DeliveredMessage


@pytest.fixture()
def amqp_url():
    return os.environ.get('BROKER_CONNECTION_STRING')


@pytest.fixture
async def amqp_connection(amqp_url):
    connection = Connection(amqp_url)
    async with connection:
        yield connection


@pytest.mark.asyncio
async def test_cancel_ok_frame_removes_consumer_without_error(
    amqp_connection: aiormq.Connection,
) -> None:
    exchange_name = 'test_exchange'
    queue_name = f'cancel_ok_test.{uuid.uuid4()}'

    channel = await amqp_connection.channel()

    print('declare exchange %s', exchange_name)
    await channel.exchange_declare(
        exchange_name, exchange_type='topic', durable=True
    )

    print('declare queue %s', queue_name)
    await channel.queue_declare(queue_name, durable=True)
    await channel.queue_bind(queue_name, 'test_exchange')

    async def consumer_callback(
        msg: DeliveredMessage
    ) -> None:
        print('consumer_callback %s', msg.body)
        await channel.basic_ack(msg.delivery_tag)

    consumer_tag = uuid.uuid4().hex

    await channel.basic_consume(
        queue_name,
        consumer_tag=consumer_tag,
        consumer_callback=consumer_callback
    )

    async def emulate_cancel_frame() -> None:
        """
        Something like this happens time to time in our rabbitmq cluster.
        Looks like cluster node migrated to another machine, and we got CancelOk frame,
        but we can't catch it due to lack of callbacks in aiormq.
        """
        await asyncio.sleep(3)
        print('emulate cancel from rabbitmq node')
        # in our app test I deleted a queue from rabbitmq and got same result
        await channel.basic_cancel(consumer_tag)

    asyncio.create_task(emulate_cancel_frame())

    cancel_token = asyncio.get_event_loop().create_future()

    # In application we use same cancel_token and while loop.
    while cancel_token: # but if we changed it by `consumer_tag in channel._consumers` it will work
        await channel.basic_publish(
            b'hello', exchange=exchange_name
        )
        print('wait for cancel')
        await asyncio.sleep(1)

        # We need to know if consumer is cancelled
        # and call cancel_token.cancel(),
        # it will stop application and k8s will restart it
        assert not amqp_connection.is_closed
        assert not channel.is_closed # I'm not sure if channel closed in our case but in this test it seems OK

In our production solution, we have implemented a workaround by periodically monitoring for lost consumers. If a problem is detected, we will close the connection, which in turn causes the pod to restart.

    async def _watch_consumers_task(
        self,
        consumer_tag: str,
        channel: aio_pika.Channel,
    ) -> None:
        """
        Check if our consumer is still scheduled for receiving messages.
        Work around https://github.com/mosquito/aiormq/issues/116
        """

        class ConsumerNotFoundError(Exception):
            ...

        async def wait_for_cancel() -> None:
            channel_ = await channel.get_underlay_channel()
            while True:
                await asyncio.sleep(self._watch_consumers_interval)
                if consumer_tag not in channel_.consumers:
                    logger.warning(
                        'Consumer tag not found in `channel.consumers`. '
                        'Probably was cancelled due to queue deletion.',
                        tag=consumer_tag,
                    )
                    raise ConsumerNotFoundError('consumer not found')

        logger.debug('watching consumer')

        try:
            await wait_for_cancel()
        except ConsumerNotFoundError:
            logger.warning('consumer not found', consumer_tag=consumer_tag)
            await self._connector.close()
        except Exception as e:
            logger.error(
                'unexpected error',
                consumer_tag=consumer_tag,
                error=str(e),
            )
            await self._connector.close()

See #116

vlmihnevich avatar Oct 26 '23 05:10 vlmihnevich

I got same problem with failing RabbitMQ. On latest versions in Windows Server editions periodically it fails with receiving available memory volume. After fail it closes channel and consumer stops receiving messages. In recent flow I can't detect whether this channel open or not without interacting with this channel, which shouldn't occur, only listening.

13hakta avatar Jan 06 '24 09:01 13hakta