aio-pika
aio-pika copied to clipboard
QueueIterator hangs forever when RabbitMQ disconnects
If RabbitMQ disconnects then QueueIterator will hang forever or until the coroutine is cancelled.
queue = await channel.declare_queue(queue_name, durable=True)
try:
async for message in queue:
pass
I see this too. When I set logging.basicConfig(level=logging.DEBUG)
the following error is reported:
DEBUG:aio_pika.connection:Closing AMQP connection None
DEBUG:aiormq.connection:Reader task cancelled:
Traceback (most recent call last):
File "/home/BHDGSYSTEMATIC.COM/rblackbourn/dev/scratch/python/aio_pika-example1/.venv/lib/python3.8/site-packages/aiormq/connection.py", line 383, in __reader
return await self.close(
File "/home/BHDGSYSTEMATIC.COM/rblackbourn/dev/scratch/python/aio_pika-example1/.venv/lib/python3.8/site-packages/aiormq/base.py", line 149, in close
await self.loop.create_task(self.__closer(exc))
asyncio.exceptions.CancelledError
As a workaround this error can be caught by adding a connection.add_close_callback
which does detect the failure:
def close_callback(connection: aio_pika.Connection, error: Exception) -> None:
if isinstance(error, ConnectionClosed):
code, text = error.args
if code == 320: # "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
print("Handle shutdown here ...")
I have this helper method:
"""Helpers for asyncio"""
import asyncio
from asyncio import Event, Future, Task
from typing import AsyncIterator, Set, TypeVar, cast
T = TypeVar('T')
async def cancel_task_and_await(task: Task) -> None:
"""Cancel a task and await it
:param task: The task
:type task: Task
"""
task.cancel()
try:
# If the task has remaining await statements it must be awaited.
await task
except asyncio.CancelledError:
pass
async def aiter_cancellable(
async_iterator: AsyncIterator[T],
cancellation_event: Event
) -> AsyncIterator[T]:
"""Create a cancellable async iterator.
:param async_iterator: The async iterator to wrap
:param cancellation_event: The asyncio Event to controll cancellation.
:return: The wrapped async iterator
"""
cancellation_task = asyncio.create_task(cancellation_event.wait())
result_iter = async_iterator.__aiter__()
pending: Set[Future] = {
cancellation_task
}
while not cancellation_event.is_set():
try:
anext_task = asyncio.create_task(result_iter.__anext__())
pending.add(anext_task)
done, pending = await asyncio.wait(
pending,
return_when=asyncio.FIRST_COMPLETED
)
if anext_task in done:
yield anext_task.result()
except StopAsyncIteration:
await cancel_task_and_await(cancellation_task)
return
for task in pending:
await cancel_task_and_await(cast(Task, task))
Then this code:
async def main_async():
# Create an event that gets set when the connection fails.
cancellation_event = asyncio.Event()
def close_callback(_conn: aio_pika.Connection, error: Exception) -> None:
if isinstance(error, ConnectionClosed):
code, text = error.args
if code == 320: # "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
LOGGER.error("Connection failure (%s) %s", code, text)
cancellation_event.set()
async with await aio_pika.connect(
host='localhost',
port=5672,
virtualhost='/',
login='guest',
password='guest'
) as connection:
connection.add_close_callback(close_callback)
async with await connection.channel() as channel:
queue = await channel.declare_queue('some.queue', passive=True)
async with queue.iterator() as queue_iter:
# Use the cancellation event HERE!!!
async for message in aiter_cancellable(queue_iter, cancellation_event):
try:
async with message.process():
print(message.body.decode())
except:
print('Failed to process message')
+1
There are several open tickets related to this, would love to see an official solution.
Any update on this?
Any update for this issue , still getting this error in version : 8.1.1
I believe this may be related to: https://github.com/mosquito/aio-pika/issues/358 ... I've added a comment there with a work-around which solves both issues for us.
Any official update for this issue? Still getting this error in version : 9.1.2
Does #615 resolve the problem? (That is the purose of the PR)
What's the status here? Is there anyone reviewing #615? Is there anything I can do to help speed this up? We've encountered this issue in production & it's not fun to see your workers silently hang indefinitely.