aio-pika
aio-pika copied to clipboard
ConnectionAbortedError for long running operations
Hello
I have a problem with long running operations and rabbit. When operation takes less than ~90s, the implementation behaves as it should, acking the incoming message and taking the next one in order. But when message handler takes longer it's breaking with an error: Server connection error: ConnectionAbortedError(10053, 'An established connection was aborted by the software in your host machine', None, 10053, None)
My code looks rather simple and looks like this:
async def main(loop: asyncio.AbstractEventLoop):
rabbit_host = os.getenv("RABBIT_HOST", "localhost")
rabbit_login = os.getenv("RABBIT_LOGIN", "guest")
rabbit_pass = os.getenv("RABBIT_PASSWORD", "guest")
# 1. init rabbitmq connections
connection = await aio_pika.connect_robust(
f"amqp://{rabbit_login}:{rabbit_pass}@{rabbit_host}/", loop=loop
)
print("Waiting for messages. To exit press CTRL+C")
async with connection:
receive_channel: aio_pika.abc.AbstractChannel = await connection.channel()
await receive_channel.set_qos(1)
receive_queue: aio_pika.abc.AbstractQueue = await receive_channel.declare_queue(
"queue_request",
durable=True,
arguments={"x-queue-type": "quorum", "x-delivery-limit": 5},
)
async with receive_queue.iterator() as queue_iter:
# Cancel consuming after __aexit__
async for message in queue_iter:
print('Getting message from queue')
if connection.is_closed:
print("Connection is closed, need to reconnect, reconnecting")
await connection.reconnect()
async with message.process():
message_dict = json.loads(message.body.decode())
message_to_publish = handleMessage(message=message_dict) # this handler can take more than 90s, which fails the message
pikamsg = aio_pika.Message(body=message_to_publish.encode())
if connection.is_closed:
print("Connection is closed, need to reconnect, reconnecting")
await connection.reconnect()
publish_channel: aio_pika.abc.AbstractChannel = (
await connection.channel()
)
await publish_channel.default_exchange.publish(
message=pikamsg, routing_key="queue_response"
)
await message.ack()
print("Acked source message and published response message successfully")
await publish_channel.close()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Handling fails after the first iteration, in line async with message.process():
and the first message is not acked.
As for ACK - the documentation is all over the place for that and looks like it's totally lacking information about how the iterator behaves and where to put the no_ack
flag, if anywhere and with which value...
I thought that maybe adding these if connection.is_closed:
with reconnect()
would help, but it doesn't even get there at all, even when connection.is_closed = True
I would be super grateful for any explanation what might be wrong.