Using channel after CancelledError exception in basic_consume callback can cause connection to close
Python 3.8.1
aio-pika 6.4.1
aiormq 3.2.0
Consider callback example (using aio_pika):
async def callback(message: IncomingMessage):
async with message.process():
await asyncio.sleep(0.1)
Or plain aiormq:
async def callback(message: DeliveredMessage):
try:
await asyncio.sleep(0.1)
except asyncio.CancelledError:
await message.channel.basic_nack(message.delivery.delivery_tag)
else:
await message.channel.basic_ack(message.delivery.delivery_tag)
and this section: https://github.com/mosquito/aiormq/blob/63a8b0d6d50eb93f9ee8c851fcfda01e3d5f122b/aiormq/base.py#L139-L143
Those two examples give me:
"CHANNEL_ERROR - expected 'channel.open'"
in Connection.add_close_callback (Connection.closing.add_done_callback for aiormq) after await channel.close()
I understand that second example is far-fetched. It's not exactly necessary to send nack/reject in case of CancelledError. But I ran into something similar to first example and had been debugging for a long time what causes sending nacks after channel has been closed. I thought that I simply can't send anything after close, so I needed to wrap code in callback in some logic to prevent sending messages after close. And than I found that channel.close waits for all subtasks(including on_message callbacks?) but for some reason after it have already sent Channel.Close method via AMQP So my question is following: Is closing channel first and than cancelling subtasks intentional?
One full example:
import asyncio
import logging
from typing import Optional
import aio_pika
from aio_pika import IncomingMessage
import sys
logging.basicConfig(format='%(relativeCreated)8.2f - %(name)20s - %(levelname)8s - %(message)s', level=logging.DEBUG, stream=sys.stdout)
queue_name = 'test_queue'
async def callback(message: IncomingMessage):
try:
async with message.process():
await asyncio.sleep(0.1)
finally:
print('--- Callback finished')
def close_callback(reason):
logging.warning('CONNECTION CLOSED %s', str(reason))
async def consumer():
connection: Optional[aio_pika.Connection] = None
channel: Optional[aio_pika.channel.Channel] = None
tag: Optional[str] = None
q: Optional[aio_pika.Queue] = None
try:
connection = await aio_pika.connect('amqp://guest:guest@localhost')
connection.add_close_callback(close_callback)
channel = await connection.channel()
q = await channel.declare_queue(queue_name, durable=True)
await channel.set_qos(prefetch_count=3)
tag = await q.consume(callback)
await asyncio.sleep(1.2)
logging.info('Starting to close')
finally:
if q and tag:
await q.cancel(tag)
logging.info('Queue consume canceled')
if channel:
await channel.close()
logging.info('Channel closed')
await asyncio.sleep(2)
logging.info('After 2 seconds: Connection.is_closed == %s ', connection.is_closed)
if connection:
logging.info('Before connection.close()')
await connection.close()
logging.info('After connection.close()')
if __name__ == '__main__':
asyncio.run(consumer())
Output:
51.30 - asyncio - DEBUG - Using selector: EpollSelector
56.96 - aio_pika.connection - DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
57.17 - aio_pika.connection - DEBUG - Channel created: <Channel "None#Not initialized channel">
58.63 - aio_pika.queue - DEBUG - Declaring queue: <Queue(test_queue): auto_delete=False, durable=True, exclusive=False, arguments=None>
194.32 - aio_pika.queue - DEBUG - Start to consuming queue: <Queue(test_queue): auto_delete=False, durable=True, exclusive=False, arguments=None>
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
--- Callback finished
1396.74 - root - INFO - Starting to close
1398.72 - root - INFO - Queue consume canceled
--- __CLOSER <Channel: "1">
--- _on_close ISSUED <Channel: "1">
--- _on_close FINISHED <Channel: "1">
--- _cancel_tasks ISSUED <Channel: "1">
--- Callback finished
--- Callback finished
--- Callback finished
--- _cancel_tasks FINISHED <Channel: "1">
1402.86 - root - INFO - Channel closed
--- __CLOSER <Connection: "amqp://guest:******@localhost">
--- _on_close ISSUED <Connection: "amqp://guest:******@localhost">
--- _cancel_tasks ISSUED <Connection: "amqp://guest:******@localhost">
1406.87 - root - WARNING - CONNECTION CLOSED CHANNEL_ERROR - expected 'channel.open'
1407.04 - aio_pika.connection - DEBUG - Closing AMQP connection None
1407.40 - aiormq.connection - DEBUG - Reader task cancelled:
Traceback (most recent call last):
File "/home/lamar/.envs/api_server3/lib/python3.8/site-packages/aiormq/connection.py", line 385, in __reader
return await self.close(
File "/home/lamar/.envs/api_server3/lib/python3.8/site-packages/aiormq/base.py", line 154, in close
await self.loop.create_task(self.__closer(exc))
asyncio.exceptions.CancelledError
3404.40 - root - INFO - After 2 seconds: Connection.is_closed == False
3404.56 - root - INFO - Before connection.close()
3404.63 - root - INFO - After connection.close()
P.S. I added custom prints to __closer in aiormq:
async def __closer(self, exc):
print("--- __CLOSER", repr(self))
if self.is_closed: # pragma: no cover
return
with suppress(Exception):
print('--- _on_close ISSUED', repr(self))
await self._on_close(exc)
print('--- _on_close FINISHED', repr(self))
with suppress(Exception):
print('--- _cancel_tasks ISSUED', repr(self))
await self._cancel_tasks(exc)
print('--- _cancel_tasks FINISHED', repr(self))
P.S Connection.is_closed equals False in log 2 seconds after closing callback fired
I'm getting this exact issue also. It would be good to have some clarity over this functionality.
I had a look at the commit history to try and understand why that is the case.
It looks as though one day he fixed it by cancelling tasks before closing the channel: https://github.com/mosquito/aiormq/commit/4ee42e2396213657b4cc812aa4b47e74d458b09d#diff-03ab1aa1a76f0fbf4c092ce774a54d08
However the next commit on the same day looks to revert that change as part of another change: https://github.com/mosquito/aiormq/commit/ed5546126bf3c734385e3bfa8dae22e17effe870#diff-03ab1aa1a76f0fbf4c092ce774a54d08
Based on the name of the commit message, I'd suspect he accidentally committed his old files and overrode his initial change.
Is this correct?
Let's see to the aiormq.base._cancel_tasks method. It's cancelling all tasks for FutureStore. So, you are totally right, it's useless to send frames for closed channel, but actually you doesn't send anything, because channels instance has no writer for write frames.
AMQP is totally asynchronous, and the receiving Channel.Close frame is terminal situation for the channel. It means you must stop to send anything for this channel, but you doesn't know the future state of user code. Thus last chance to handle this situation is sends Channel.CloseOk frame as soon as possible and translate channel instance to the closed state. Thus channels methods will be raising exceptions.
But I might be misunderstood your problem 🤔.
If you have a solution (or any proposal) for this problem, don't keep silence.
The issue for me(and the reason for writing) is that this happens:
1406.87 - root - WARNING - CONNECTION CLOSED CHANNEL_ERROR - expected 'channel.open'
because we tried to send something after channel has been closed. Suppose we have multiple channels that listen to some queues like this:
async def callback(message: IncomingMessage):
async with message.process():
await asyncio.sleep(0.1)
and then we issue await queue.cancel(tag) and await channel.close() on one channel. And this causes whole connection to close (because of error on AMQP level).
Consider this example:
import asyncio
import logging
import sys
from typing import Optional
import aio_pika
from aio_pika import IncomingMessage
logging.basicConfig(
format='%(relativeCreated)8.2f - %(name)20s - %(levelname)8s - %(message)s',
level=logging.DEBUG,
stream=sys.stdout
)
N = 10
RABBIT_URL = 'amqp://guest:guest@localhost'
QUEUE_NAMES = ['test_queue{}'.format(i) for i in range(N)]
def callback_wrapper(queue_name: str):
async def callback(message: IncomingMessage):
try:
async with message.process():
await asyncio.sleep(0.1)
finally:
print('--- Callback finished queue:', queue_name)
return callback
def close_callback(reason):
logging.warning('CONNECTION CLOSED %s', str(reason))
async def consumer(connection: aio_pika.Connection, i: int, queue_name: str):
channel: Optional[aio_pika.channel.Channel] = None
tag: Optional[str] = None
queue: Optional[aio_pika.Queue] = None
try:
channel = await connection.channel()
queue = await channel.declare_queue(queue_name)
await channel.set_qos(prefetch_count=3)
tag = await queue.consume(callback_wrapper(queue_name))
await asyncio.sleep(3600 if i > 0 else 2)
except:
logging.exception('Consumer exception on queue: %s', queue_name)
finally:
if queue and tag:
logging.info('Before canceling %s', queue_name)
await queue.cancel(tag)
logging.info('Queue %s consume canceled', queue_name)
if channel:
await channel.close()
logging.info('Channel closed (Queue: %s)', queue_name)
async def main():
connection: Optional[aio_pika.Connection] = None
try:
connection = await aio_pika.connect('amqp://guest:guest@localhost')
connection.add_close_callback(close_callback)
tasks = [
asyncio.create_task(consumer(connection, i, queue_name))
for i, queue_name in enumerate(QUEUE_NAMES)
]
await asyncio.gather(*tasks, return_exceptions=True)
finally:
await asyncio.sleep(2)
logging.info('After 2 seconds: Connection.is_closed == %s ', connection.is_closed)
if connection:
await connection.close()
logging.info('Connection closed')
if __name__ == '__main__':
asyncio.run(main())
Queue filler code for testing:
import asyncio
from typing import Awaitable, Callable
import aio_pika
N = 10
RABBIT_URL = 'amqp://guest:guest@localhost'
QUEUE_NAMES = ['test_queue{}'.format(i) for i in range(N)]
async def purge(connection: aio_pika.Connection, queue_name: str):
channel: aio_pika.Channel = await connection.channel(publisher_confirms=False)
queue: aio_pika.Queue = await channel.declare_queue(queue_name)
await queue.purge()
async def push(connection: aio_pika.Connection, queue_name: str):
channel: aio_pika.Channel = await connection.channel(publisher_confirms=False)
await channel.declare_queue(queue_name)
for i in range(100000):
await channel.default_exchange.publish(aio_pika.Message(
body=f'Q:{queue_name} Message: {i}'.encode()),
routing_key=queue_name)
async def main(fill=True):
connection: aio_pika.Connection = await aio_pika.connect(RABBIT_URL)
action: Callable[[aio_pika.Connection, str], Awaitable] = push if fill else purge
tasks = [asyncio.create_task(action(connection, queue_name)) for queue_name in QUEUE_NAMES]
await asyncio.gather(*tasks, return_exceptions=True)
await connection.close()
if __name__ == '__main__':
asyncio.run(main(fill=True))
Output that I get:
/home/lamar/.envs/issue_aiormq/bin/python /home/lamar/PycharmProjects/issue_aiormq/test.py
68.57 - asyncio - DEBUG - Using selector: EpollSelector
74.49 - aio_pika.connection - DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
74.67 - aio_pika.connection - DEBUG - Channel created: <Channel "None#Not initialized channel">
74.83 - aio_pika.connection - DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
74.89 - aio_pika.connection - DEBUG - Channel created: <Channel "None#Not initialized channel">
75.01 - aio_pika.connection - DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
75.07 - aio_pika.connection - DEBUG - Channel created: <Channel "None#Not initialized channel">
75.18 - aio_pika.connection - DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
75.25 - aio_pika.connection - DEBUG - Channel created: <Channel "None#Not initialized channel">
75.35 - aio_pika.connection - DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
75.41 - aio_pika.connection - DEBUG - Channel created: <Channel "None#Not initialized channel">
76.31 - aio_pika.connection - DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
76.39 - aio_pika.connection - DEBUG - Channel created: <Channel "None#Not initialized channel">
76.52 - aio_pika.connection - DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
76.59 - aio_pika.connection - DEBUG - Channel created: <Channel "None#Not initialized channel">
76.69 - aio_pika.connection - DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
76.75 - aio_pika.connection - DEBUG - Channel created: <Channel "None#Not initialized channel">
76.88 - aio_pika.connection - DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
76.94 - aio_pika.connection - DEBUG - Channel created: <Channel "None#Not initialized channel">
77.05 - aio_pika.connection - DEBUG - Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
77.10 - aio_pika.connection - DEBUG - Channel created: <Channel "None#Not initialized channel">
81.54 - aio_pika.queue - DEBUG - Declaring queue: <Queue(test_queue0): auto_delete=False, durable=None, exclusive=False, arguments=None>
81.77 - aio_pika.queue - DEBUG - Declaring queue: <Queue(test_queue3): auto_delete=False, durable=None, exclusive=False, arguments=None>
81.97 - aio_pika.queue - DEBUG - Declaring queue: <Queue(test_queue4): auto_delete=False, durable=None, exclusive=False, arguments=None>
82.07 - aio_pika.queue - DEBUG - Declaring queue: <Queue(test_queue1): auto_delete=False, durable=None, exclusive=False, arguments=None>
82.17 - aio_pika.queue - DEBUG - Declaring queue: <Queue(test_queue5): auto_delete=False, durable=None, exclusive=False, arguments=None>
82.27 - aio_pika.queue - DEBUG - Declaring queue: <Queue(test_queue6): auto_delete=False, durable=None, exclusive=False, arguments=None>
82.35 - aio_pika.queue - DEBUG - Declaring queue: <Queue(test_queue2): auto_delete=False, durable=None, exclusive=False, arguments=None>
82.83 - aio_pika.queue - DEBUG - Declaring queue: <Queue(test_queue7): auto_delete=False, durable=None, exclusive=False, arguments=None>
82.95 - aio_pika.queue - DEBUG - Declaring queue: <Queue(test_queue8): auto_delete=False, durable=None, exclusive=False, arguments=None>
84.06 - aio_pika.queue - DEBUG - Declaring queue: <Queue(test_queue9): auto_delete=False, durable=None, exclusive=False, arguments=None>
141.61 - aio_pika.queue - DEBUG - Start to consuming queue: <Queue(test_queue0): auto_delete=False, durable=None, exclusive=False, arguments=None>
147.72 - aio_pika.queue - DEBUG - Start to consuming queue: <Queue(test_queue1): auto_delete=False, durable=None, exclusive=False, arguments=None>
197.64 - aio_pika.queue - DEBUG - Start to consuming queue: <Queue(test_queue8): auto_delete=False, durable=None, exclusive=False, arguments=None>
207.51 - aio_pika.queue - DEBUG - Start to consuming queue: <Queue(test_queue2): auto_delete=False, durable=None, exclusive=False, arguments=None>
227.28 - aio_pika.queue - DEBUG - Start to consuming queue: <Queue(test_queue9): auto_delete=False, durable=None, exclusive=False, arguments=None>
232.13 - aio_pika.queue - DEBUG - Start to consuming queue: <Queue(test_queue5): auto_delete=False, durable=None, exclusive=False, arguments=None>
232.26 - aio_pika.queue - DEBUG - Start to consuming queue: <Queue(test_queue3): auto_delete=False, durable=None, exclusive=False, arguments=None>
232.34 - aio_pika.queue - DEBUG - Start to consuming queue: <Queue(test_queue6): auto_delete=False, durable=None, exclusive=False, arguments=None>
232.68 - aio_pika.queue - DEBUG - Start to consuming queue: <Queue(test_queue4): auto_delete=False, durable=None, exclusive=False, arguments=None>
236.56 - aio_pika.queue - DEBUG - Start to consuming queue: <Queue(test_queue7): auto_delete=False, durable=None, exclusive=False, arguments=None>
--- Callback finished queue: test_queue0
--- Callback finished queue: test_queue0
--- Callback finished queue: test_queue0
--- Callback finished queue: test_queue1
--- Callback finished queue: test_queue1
--- Callback finished queue: test_queue1
--- Callback finished queue: test_queue8
--- Callback finished queue: test_queue8
--- Callback finished queue: test_queue8
--- Callback finished queue: test_queue2
--- Callback finished queue: test_queue2
--- Callback finished queue: test_queue2
--- Callback finished queue: test_queue9
--- Callback finished queue: test_queue9
--- Callback finished queue: test_queue9
--- Callback finished queue: test_queue3
--- Callback finished queue: test_queue3
...
<It goes for 2 seconds>
...
--- Callback finished queue: test_queue3
--- Callback finished queue: test_queue5
--- Callback finished queue: test_queue5
--- Callback finished queue: test_queue5
--- Callback finished queue: test_queue6
--- Callback finished queue: test_queue6
--- Callback finished queue: test_queue0
--- Callback finished queue: test_queue0
--- Callback finished queue: test_queue7
--- Callback finished queue: test_queue7
--- Callback finished queue: test_queue7
--- Callback finished queue: test_queue1
--- Callback finished queue: test_queue1
--- Callback finished queue: test_queue1
2146.89 - root - INFO - Before canceling test_queue0
2147.87 - root - INFO - Queue test_queue0 consume canceled
--- Callback finished queue: test_queue0
--- Callback finished queue: test_queue0
--- Callback finished queue: test_queue0
2150.29 - root - INFO - Channel closed (Queue: test_queue0)
2153.73 - asyncio - WARNING - socket.send() raised exception.
2153.89 - asyncio - WARNING - socket.send() raised exception.
2154.02 - asyncio - WARNING - socket.send() raised exception.
2154.14 - asyncio - WARNING - socket.send() raised exception.
2154.25 - asyncio - WARNING - socket.send() raised exception.
2154.36 - asyncio - WARNING - socket.send() raised exception.
2154.74 - asyncio - WARNING - socket.send() raised exception.
2154.85 - asyncio - WARNING - socket.send() raised exception.
2154.95 - asyncio - WARNING - socket.send() raised exception.
2155.06 - asyncio - WARNING - socket.send() raised exception.
2155.17 - asyncio - WARNING - socket.send() raised exception.
2155.31 - asyncio - WARNING - socket.send() raised exception.
2155.47 - asyncio - WARNING - socket.send() raised exception.
2155.58 - asyncio - WARNING - socket.send() raised exception.
2155.68 - asyncio - WARNING - socket.send() raised exception.
2155.79 - asyncio - WARNING - socket.send() raised exception.
2155.89 - asyncio - WARNING - socket.send() raised exception.
2156.00 - asyncio - WARNING - socket.send() raised exception.
2156.10 - asyncio - WARNING - socket.send() raised exception.
2156.21 - asyncio - WARNING - socket.send() raised exception.
2156.31 - root - WARNING - CONNECTION CLOSED CHANNEL_ERROR - expected 'channel.open'
2156.36 - aio_pika.connection - DEBUG - Closing AMQP connection None
2156.49 - asyncio - WARNING - socket.send() raised exception.
2156.61 - asyncio - WARNING - socket.send() raised exception.
2156.73 - asyncio - WARNING - socket.send() raised exception.
2157.30 - aiormq.connection - DEBUG - Reader task cancelled:
Traceback (most recent call last):
File "/home/lamar/.envs/issue_aiormq/lib/python3.8/site-packages/aiormq/connection.py", line 385, in __reader
return await self.close(
File "/home/lamar/.envs/issue_aiormq/lib/python3.8/site-packages/aiormq/base.py", line 156, in close
await self.loop.create_task(self.__closer(exc))
asyncio.exceptions.CancelledError
--- Callback finished queue: test_queue4
--- Callback finished queue: test_queue7
--- Callback finished queue: test_queue1
--- Callback finished queue: test_queue6
--- Callback finished queue: test_queue6
--- Callback finished queue: test_queue1
--- Callback finished queue: test_queue7
--- Callback finished queue: test_queue5
--- Callback finished queue: test_queue5
--- Callback finished queue: test_queue4
--- Callback finished queue: test_queue8
--- Callback finished queue: test_queue2
--- Callback finished queue: test_queue3
--- Callback finished queue: test_queue7
--- Callback finished queue: test_queue3
--- Callback finished queue: test_queue4
--- Callback finished queue: test_queue2
--- Callback finished queue: test_queue8
--- Callback finished queue: test_queue1
--- Callback finished queue: test_queue2
--- Callback finished queue: test_queue9
--- Callback finished queue: test_queue3
--- Callback finished queue: test_queue6
--- Callback finished queue: test_queue5
--- Callback finished queue: test_queue8
--- Callback finished queue: test_queue9
--- Callback finished queue: test_queue9
<It stopes outputing here completly>
Program is still running because of await asyncio.sleep(3600 if i > 0 else 2) but other channels do not receive anything because connection is already closed:
No. Time Source Destination Protocol Length Info
1443 18.289208205 127.0.0.1 127.0.0.1 AMQP 118 Basic.Cancel
1444 18.289475910 127.0.0.1 127.0.0.1 AMQP 117 Basic.Cancel-Ok
1445 18.289862357 127.0.0.1 127.0.0.1 AMQP 85 Channel.Close reply=
1446 18.290321621 127.0.0.1 127.0.0.1 AMQP 78 Channel.Close-Ok
1447 18.290765956 127.0.0.1 127.0.0.1 AMQP 87 Basic.Reject
1448 18.290827337 127.0.0.1 127.0.0.1 AMQP 87 Basic.Reject
1449 18.290845098 127.0.0.1 127.0.0.1 TCP 66 5672 → 42554 [ACK] Seq=102188 Ack=13864 Win=65536 Len=0 TSval=2578187892 TSecr=2578187892
1450 18.290929211 127.0.0.1 127.0.0.1 AMQP 87 Basic.Reject
1451 18.292394659 127.0.0.1 127.0.0.1 AMQP 124 Connection.Close reply=CHANNEL_ERROR - expected 'channel.open'
1452 18.292680238 127.0.0.1 127.0.0.1 AMQP 78 Connection.Close-Ok
1453 18.292863989 127.0.0.1 127.0.0.1 TCP 66 42554 → 5672 [FIN, ACK] Seq=13897 Ack=102246 Win=65536 Len=0 TSval=2578187894 TSecr=2578187894
1454 18.292885056 127.0.0.1 127.0.0.1 TCP 66 5672 → 42554 [FIN, ACK] Seq=102246 Ack=13898 Win=65536 Len=0 TSval=2578187894 TSecr=2578187894
1455 18.292895202 127.0.0.1 127.0.0.1 TCP 66 42554 → 5672 [ACK] Seq=13898 Ack=102247 Win=65536 Len=0 TSval=2578187894 TSecr=2578187894
Than I tried to reverse
with suppress(Exception):
await self._on_close(exc)
with suppress(Exception):
await self._cancel_tasks(exc)
in aiormq.base.__closer Output is quite similar, except the part:
- No
WARNING - socket.send() raised exception. - No
WARNING - CONNECTION CLOSED CHANNEL_ERROR - expected 'channel.open' - No
DEBUG - Reader task cancelled: <exception> - Other channels continued to receive data and spit it out.
Than I send SIGINT, some KeyboardInterrupt, Cancellation Exceptions were thrown and program hanged, but that is not related to this issues and is related to mosquito/aio-pika#253
So my reasoning is that trying to write to closed channel should not effect connection or other channels in this drastic manner. Maybe reversing solves this issue completely or one can prevent (in the library code) sending anything after channel.close has been issued.