aio-pika icon indicating copy to clipboard operation
aio-pika copied to clipboard

QueueIterator hangs forever when RabbitMQ disconnects

Open Phibonacci opened this issue 3 years ago • 9 comments

If RabbitMQ disconnects then QueueIterator will hang forever or until the coroutine is cancelled.

        queue = await channel.declare_queue(queue_name, durable=True)
        try:
            async for message in queue:
                pass

Phibonacci avatar Feb 23 '21 16:02 Phibonacci

I see this too. When I set logging.basicConfig(level=logging.DEBUG) the following error is reported:

DEBUG:aio_pika.connection:Closing AMQP connection None
DEBUG:aiormq.connection:Reader task cancelled:
Traceback (most recent call last):
  File "/home/BHDGSYSTEMATIC.COM/rblackbourn/dev/scratch/python/aio_pika-example1/.venv/lib/python3.8/site-packages/aiormq/connection.py", line 383, in __reader
    return await self.close(
  File "/home/BHDGSYSTEMATIC.COM/rblackbourn/dev/scratch/python/aio_pika-example1/.venv/lib/python3.8/site-packages/aiormq/base.py", line 149, in close
    await self.loop.create_task(self.__closer(exc))
asyncio.exceptions.CancelledError

As a workaround this error can be caught by adding a connection.add_close_callback which does detect the failure:

def close_callback(connection: aio_pika.Connection, error: Exception) -> None:
    if isinstance(error, ConnectionClosed):
        code, text = error.args
        if code == 320:  # "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
            print("Handle shutdown here ...")

rob-blackbourn avatar Mar 29 '21 06:03 rob-blackbourn

I have this helper method:

"""Helpers for asyncio"""

import asyncio
from asyncio import Event, Future, Task
from typing import AsyncIterator, Set, TypeVar, cast

T = TypeVar('T')

async def cancel_task_and_await(task: Task) -> None:
    """Cancel a task and await it

    :param task: The task
    :type task: Task
    """
    task.cancel()
    try:
        # If the task has remaining await statements it must be awaited.
        await task
    except asyncio.CancelledError:
        pass


async def aiter_cancellable(
        async_iterator: AsyncIterator[T],
        cancellation_event: Event
) -> AsyncIterator[T]:
    """Create a cancellable async iterator.

    :param async_iterator: The async iterator to wrap
    :param cancellation_event: The asyncio Event to controll cancellation.
    :return: The wrapped async iterator
    """
    cancellation_task = asyncio.create_task(cancellation_event.wait())
    result_iter = async_iterator.__aiter__()
    pending: Set[Future] = {
        cancellation_task
    }
    while not cancellation_event.is_set():
        try:
            anext_task = asyncio.create_task(result_iter.__anext__())
            pending.add(anext_task)
            done, pending = await asyncio.wait(
                pending,
                return_when=asyncio.FIRST_COMPLETED
            )
            if anext_task in done:
                yield anext_task.result()
        except StopAsyncIteration:
            await cancel_task_and_await(cancellation_task)
            return

    for task in pending:
        await cancel_task_and_await(cast(Task, task))

Then this code:

async def main_async():

    # Create an event that gets set when the connection fails.
    cancellation_event = asyncio.Event()

    def close_callback(_conn: aio_pika.Connection, error: Exception) -> None:
        if isinstance(error, ConnectionClosed):
            code, text = error.args
            if code == 320:  # "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
                LOGGER.error("Connection failure (%s) %s", code, text)
                cancellation_event.set()

    async with await aio_pika.connect(
            host='localhost',
            port=5672,
            virtualhost='/',
            login='guest',
            password='guest'
    ) as connection:

        connection.add_close_callback(close_callback)

        async with await connection.channel() as channel:
            queue = await channel.declare_queue('some.queue', passive=True)

            async with queue.iterator() as queue_iter:
                # Use the cancellation event HERE!!!
                async for message in aiter_cancellable(queue_iter, cancellation_event):
                    try:
                        async with message.process():
                            print(message.body.decode())
                    except:
                        print('Failed to process message')

rob-blackbourn avatar Mar 29 '21 07:03 rob-blackbourn

+1

There are several open tickets related to this, would love to see an official solution.

edmondburnett avatar Apr 25 '21 03:04 edmondburnett

Any update on this?

jgayfer avatar Dec 29 '21 00:12 jgayfer

Any update for this issue , still getting this error in version : 8.1.1

mknithin avatar Aug 22 '22 14:08 mknithin

I believe this may be related to: https://github.com/mosquito/aio-pika/issues/358 ... I've added a comment there with a work-around which solves both issues for us.

wallyhall avatar Apr 28 '23 12:04 wallyhall

Any official update for this issue? Still getting this error in version : 9.1.2

WaFeeAL avatar Jun 06 '23 17:06 WaFeeAL

Does #615 resolve the problem? (That is the purose of the PR)

Darsstar avatar Jan 17 '24 01:01 Darsstar

What's the status here? Is there anyone reviewing #615? Is there anything I can do to help speed this up? We've encountered this issue in production & it's not fun to see your workers silently hang indefinitely.

lukaszdudek-trueenergy avatar Jan 29 '24 13:01 lukaszdudek-trueenergy