aioamqp
aioamqp copied to clipboard
aioamqp.exceptions.AmqpClosedConnection
RabbitMQ log:
=INFO REPORT==== 29-May-2018::11:23:59 ===
accepting AMQP connection <0.604.0> (127.0.0.1:46488 -> 127.0.0.1:5672)
=INFO REPORT==== 29-May-2018::11:23:59 ===
connection <0.604.0> (127.0.0.1:46488 -> 127.0.0.1:5672): user 'hanyuu' authenticated and granted access to vhost 'test'
=WARNING REPORT==== 29-May-2018::11:24:00 ===
closing AMQP connection <0.604.0> (127.0.0.1:46488 -> 127.0.0.1:5672, vhost: 'test', user: 'hanyuu'):
client unexpectedly closed TCP connection
=INFO REPORT==== 29-May-2018::11:24:18 ===
accepting AMQP connection <0.623.0> (127.0.0.1:46526 -> 127.0.0.1:5672)
=INFO REPORT==== 29-May-2018::11:24:18 ===
connection <0.623.0> (127.0.0.1:46526 -> 127.0.0.1:5672): user 'hanyuu' authenticated and granted access to vhost 'test'
=WARNING REPORT==== 29-May-2018::11:26:18 ===
closing AMQP connection <0.623.0> (127.0.0.1:46526 -> 127.0.0.1:5672, vhost: 'test', user: 'hanyuu'):
client unexpectedly closed TCP connection
Traceback:
Traceback (most recent call last):
File "/home/hanyuu/projects/worker/main.py", line 79, in rabbit_handler
await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)
File "/home/hanyuu/venvs/worker/lib/python3.6/site-packages/aioamqp/channel.py", line 720, in basic_client_ack
yield from self._write_frame(frame, request)
File "/home/hanyuu/venvs/worker/lib/python3.6/site-packages/aioamqp/channel.py", line 111, in _write_frame
yield from self.protocol.ensure_open()
File "/home/hanyuu/venvs/worker/lib/python3.6/site-packages/aioamqp/protocol.py", line 133, in ensure_open
raise exceptions.AmqpClosedConnection()
aioamqp.exceptions.AmqpClosedConnection
Code:
async def rabbit_handler(channel: Channel, body: bytes, envelope: Envelope, properties: Properties, data_channel: Channel):
try:
message = json.loads(body.decode('utf-8'))
res = await handle_message(message, data_channel)
await data_channel.basic_publish(payload=json.dumps(res), exchange_name='', routing_key='data')
await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)
except AmqpClosedConnection as e:
log.error(e)
exit(-1)
async def start_rabbit():
transport, protocol = await aioamqp.from_url(config['worker']['rabbit']['url'])
channel = await protocol.channel()
_channel = await protocol.channel()
await channel.queue_declare(queue_name=config['worker']['rabbit']['queue_name'])
await _channel.queue_declare(queue_name='data')
await channel.basic_consume(partial(rabbit_handler, data_channel=_channel), queue_name=config['worker']['rabbit']['queue_name'])
async def run():
url = f'{config["worker"]["rabbit"]["url"]}'
print(f'Started on {url}')
await start_rabbit()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.run_forever()
Connection url:
amqp://hanyuu:***@127.0.0.1:5672/test?heartbeat=0&connection_timeout=120000
Hi, guys!
Any news about this issue?
I have the same problem:
import asyncio
import aioamqp
async def connect():
transport, protocol = await aioamqp.from_url('amqp://gateway:[email protected]:5672/gateway')
print("connected !")
await asyncio.sleep(10)
print("close connection")
await protocol.close()
transport.close()
asyncio.get_event_loop().run_until_complete(connect())
When I run this code, I have got the following:
python t2.py
only PLAIN login_method is supported, falling back to AMQPLAIN
connected !
close connection
Traceback (most recent call last):
File "t2.py", line 14, in <module>
asyncio.get_event_loop().run_until_complete(connect())
File "/usr/lib64/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "t2.py", line 11, in connect
await protocol.close()
File "/home/mamonov/env-notifier/lib64/python3.8/site-packages/aioamqp/protocol.py", line 160, in close
await self.ensure_open()
File "/home/mamonov/env-notifier/lib64/python3.8/site-packages/aioamqp/protocol.py", line 135, in ensure_open
raise exceptions.AmqpClosedConnection()
aioamqp.exceptions.AmqpClosedConnection
Rabbitmq logs shows me the following:
=INFO REPORT==== 7-Aug-2020::23:12:10 ===
accepting AMQP connection <0.32190.3> (10.2.1.10:44634 -> 10.2.0.153:5672)
=WARNING REPORT==== 7-Aug-2020::23:12:14 ===
closing AMQP connection <0.32190.3> (10.2.1.10:44634 -> 10.2.0.153:5672):
connection_closed_abruptly
How can I fix the problem?
There is a bug in aioamqp that the code incorrectly interprets hearbeat with the value of 0. By default heartbeat is set to None on a client side but the server makes the client to use the heartbeat with a value of 0 by sending Connection.Tune method, so to workaround it you need to explicitly set heartbeat with some value except 0 and None. But do not set it in URL because the heartbeat parameter isn't read from that.
So you can use the code like this:
self.transport, self.protocol = await aioamqp.from_url(url, heartbeat=60)
The problem has already been solved in this PR: https://github.com/Polyconseil/aioamqp/pull/209/commits/57d3996a7690f9534b3ea07bfa584d318b55082e