aio-pika
aio-pika copied to clipboard
RuntimeError: <RobustConnection: "amqp://guest:******@192.168.99.110/" 5 channels> connection closed
import asyncio
import gc
import weakref
import objgraph
import aio_pika
gc.disable()
async def f(rabbitmq_connection, weakset):
async with rabbitmq_connection.channel() as channel:
print('new created')
weakset.add(channel)
print('exetied')
async def main():
rabbitmq_connection: aio_pika.Connection = await aio_pika.connect_robust(
'amqp://guest:guest@localhost/'
)
weakset = weakref.WeakSet()
async with rabbitmq_connection:
for i in range(5):
await f(rabbitmq_connection, weakset)
await asyncio.sleep(10)
print(len(weakset))
objgraph.show_backrefs(weakset.pop())
if __name__ == '__main__':
asyncio.run(main())
print('Complete')
References after sleep
Seems like it's trying to reconnect, even if the connection was closed by user
import os
import asyncio
import logging
import aio_pika as aiopika
logger = logging.getLogger(__name__)
AMQP_BROKER = os.getenv('AMQP_BROKER')
async def test_aio_pika():
connection = await aiopika.connect_robust(AMQP_BROKER)
await connection.close()
def main():
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(test_aio_pika())
loop.run_until_complete(asyncio.sleep(10))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
main()
outputs
DEBUG:asyncio:Using selector: EpollSelector
DEBUG:aiormq.connection:Can not read bytes from server:
Traceback (most recent call last):
File "/home/zaquest/.virtualenvs/copreso/lib/python3.6/site-packages/aiormq/connection.py", line 377, in __reader
weight, channel, frame = await self.__receive_frame()
File "/home/zaquest/.virtualenvs/copreso/lib/python3.6/site-packages/aiormq/connection.py", line 329, in __receive_frame
frame_header = await self.reader.readexactly(1)
File "/home/zaquest/.pyenv/versions/3.6.6/lib/python3.6/asyncio/streams.py", line 672, in readexactly
raise IncompleteReadError(incomplete, n)
asyncio.streams.IncompleteReadError: 0 bytes read on a total of 1 expected bytes
DEBUG:aio_pika.connection:Closing AMQP connection None
INFO:aio_pika.robust_connection:Connection to amqp://*** closed. Reconnecting after 5 seconds.
>>> # sleeps here, continues after 5 seconds, and finishes after 10 seconds
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<RobustConnection.reconnect() done, defined at /home/zaquest/.virtualenvs/copreso/lib/python3.6/site-packages/aio_pika/robust_connection.py:149> exception=RuntimeError('<RobustConnection: "amqp://***" 0 channels> connection closed',)>
Traceback (most recent call last):
File "/home/zaquest/.virtualenvs/copreso/lib/python3.6/site-packages/aio_pika/robust_connection.py", line 150, in reconnect
await self.connect()
File "/home/zaquest/.virtualenvs/copreso/lib/python3.6/site-packages/aio_pika/robust_connection.py", line 105, in connect
raise RuntimeError("{!r} connection closed".format(self))
RuntimeError: <RobustConnection: "amqp://***" 0 channels> connection closed
This seems to solve the issue for me
import os
import asyncio
import logging
import aio_pika as aiopika
logger = logging.getLogger(__name__)
AMQP_BROKER = os.getenv('AMQP_BROKER')
class Connection(aiopika.RobustConnection):
def _on_connection_close(self, connection, closing, *args, **kwargs):
if self.reconnecting:
return
self.connected.clear()
self.connection = None
# pylint: disable=bad-super-call
# Seems like pylint tries to warn us about infinite loop that
# might occur, but this is not our case
# http://pylint-messages.wikidot.com/messages:e1003
# https://stackoverflow.com/a/18208725/1211428
super(aiopika.RobustConnection, self) \
._on_connection_close(connection, closing)
if self._closed:
logger.info(
'Connection to %s closed manually. Will not reconnect.',
self
)
else:
logger.info(
'Connection to %s closed. Reconnecting after %r seconds.',
self, self.reconnect_interval
)
self.loop.call_later(
self.reconnect_interval,
lambda: self.loop.create_task(self.reconnect())
)
async def test_aio_pika():
connection = await aiopika.connect_robust(
AMQP_BROKER, connection_class=Connection
)
await connection.close()
def main():
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(test_aio_pika())
loop.run_until_complete(asyncio.sleep(10))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
main()
Еhe reason for this behaviour is in the wrong callbacks creation. For example in robust_channel
on line 77, you add callback with closure by self
self.add_close_callback(self._on_channel_close)
It adds the bound method which has reference to self. This creates a circular dependency: self -> _done_callbacks -> bound_method<_on_channel_close> -> self
For solve this problem need to make callback static and send senders to them explicitly.
@heckad do you want to create PR for fixing it?
Yes, I do
Am I in the wrong thread?) Is my issue related?
Also that thing above, seems like it doesn't actually work and just masks the issue. Now I get other errors.
It's all because callbacks start working when they are no longer needed
I faced the same error with RPC pattern example from the docs (and created SO question before finding this issue). When I run it exactly as in docs (two separate process) I have DeliveryError (but connection was closed before according to logs):
rabbit_1 | 2020-03-20 11:37:38.333 [info] <0.629.0> accepting AMQP connection <0.629.0> (192.168.32.4:57258 -> 192.168.32.3:5672)
rabbit_1 | 2020-03-20 11:37:38.354 [info] <0.629.0> connection <0.629.0> (192.168.32.4:57258 -> 192.168.32.3:5672): user 'dev_user' authenticated and granted access to vhost 'tgvhost'
rabbit_1 | 2020-03-20 11:37:38.371 [info] <0.651.0> accepting AMQP connection <0.651.0> (192.168.32.5:57200 -> 192.168.32.3:5672)
rabbit_1 | 2020-03-20 11:37:38.376 [info] <0.651.0> connection <0.651.0> (192.168.32.5:57200 -> 192.168.32.3:5672): user 'dev_user' authenticated and granted access to vhost 'tgvhost'
rabbit_1 | 2020-03-20 11:37:38.380 [info] <0.629.0> closing AMQP connection <0.629.0> (192.168.32.4:57258 -> 192.168.32.3:5672, vhost: 'tgvhost', user: 'dev_user'
Traceback (most recent call last):
rpc_host_1 | File "asgi.py", line 33, in <module>
rpc_host_1 | loop.run_until_complete(main())
rpc_host_1 | File "/usr/local/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
rpc_host_1 | return future.result()
rpc_host_1 | File "asgi.py", line 20, in main
rpc_host_1 | print(await rpc.proxy.multiply(x=100, y=i))
rpc_host_1 | File "/usr/local/lib/python3.6/site-packages/aio_pika/patterns/rpc.py", line 341, in call
rpc_host_1 | return await future
rpc_host_1 | aiormq.exceptions.DeliveryError: (IncomingMessage:{'app_id': None,
rpc_host_1 | 'body_size': 28,
rpc_host_1 | 'cluster_id': '',
rpc_host_1 | 'consumer_tag': None,
rpc_host_1 | 'content_encoding': '',
rpc_host_1 | 'content_type': '',
rpc_host_1 | 'correlation_id': '140221124514424',
rpc_host_1 | 'delivery_mode': 1,
rpc_host_1 | 'delivery_tag': None,
rpc_host_1 | 'exchange': '',
rpc_host_1 | 'expiration': None,
rpc_host_1 | 'headers': {'From': b'amq_7d6l66fdlzz3oh57lhmr3nq2u4'},
rpc_host_1 | 'message_id': 'c219f18e5dc6c38d4dbc8df046c3dc3d',
rpc_host_1 | 'priority': 5,
rpc_host_1 | 'redelivered': None,
rpc_host_1 | 'reply_to': 'amq_7d6l66fdlzz3oh57lhmr3nq2u4',
rpc_host_1 | 'routing_key': 'multiply',
rpc_host_1 | 'timestamp': datetime.datetime(2020, 3, 20, 11, 37, 38),
rpc_host_1 | 'type': 'call',
rpc_host_1 | 'user_id': None}, None)
If I run rpc_host as it is but then call 'multiply' manually (e.g. from Quart http view function) I got the exact same error:
Task exception was never retrieved
quart_service_01_1 | future: <Task finished coro=<RobustConnection.reconnect() done, defined at /usr/local/lib/python3.6/site-packages/aio_pika/robust_connection.py:149> exception=RuntimeError('<RobustConnection: "amqp://dev_user:******@rabbit:5672/tgvhost" 1 channels> connection closed',)>
quart_service_01_1 | Traceback (most recent call last):
quart_service_01_1 | File "/usr/local/lib/python3.6/asyncio/tasks.py", line 180, in _step
quart_service_01_1 | result = coro.send(None)
quart_service_01_1 | File "/usr/local/lib/python3.6/site-packages/aio_pika/robust_connection.py", line 150, in reconnect
quart_service_01_1 | await self.connect()
quart_service_01_1 | File "/usr/local/lib/python3.6/site-packages/aio_pika/robust_connection.py", line 105, in connect
quart_service_01_1 | raise RuntimeError("{!r} connection closed".format(self))
quart_service_01_1 | RuntimeError: <RobustConnection: "amqp://dev_user:******@rabbit:5672/tgvhost" 1 channels> connection closed
I tried to copy-paste RPC implementation from RabbitMQ Tutorial section and it works fine.
Is it related?
@vashchukmaksim try to run examples it works fine.
@mosquito Yes, I already checked it and commented on that PR. All works perfectly fine, thank you!