aio-pika
aio-pika copied to clipboard
Impossible to handle Unexpected connection close
How to handle Unexpected connection close and raise an exception?
I've got the logic below:
class AMQPHandler:
def __init__(self) -> None:
self.connection: AbstractRobustConnection | None = None
self.channel: AbstractChannel | None = None
async def init(self) -> None:
import settings
logger.info("Initializing AMQP handler")
config = settings.CorePublisherSettings
connection = await aio_pika.connect_robust(
config.get_dsn(),
loop=asyncio.get_event_loop(),
timeout=config.CONNECTION_TIMEOUT,
)
channel = await connection.channel()
await channel.set_qos(prefetch_count=10)
exchange = await channel.declare_exchange(
config.EXCHANGE_NAME,
config.EXCHANGE_TYPE,
auto_delete=config.EXCHANGE_AUTO_DELETE,
durable=True,
)
for key in config.BINDING_KEYS:
q_name = (
f"{key}.{config.PREFIX_BINDING_KEYS}"
if config.PREFIX_BINDING_KEYS
else key
)
queue = await channel.declare_queue(name=q_name)
await queue.bind(exchange, q_name)
await queue.consume(self.handle_message)
logger.info("Queue declared", extra={"queue": q_name})
self.connection = connection
self.channel = channel
logger.info("AMQP handler initialized")
Now, when Rabbit dropes, connection reconnects every 5 seconds. How to make it raise an exception which I could handle and stop a micro-service?
You can use a regular connection instead of robust one.
Also event connection.connected
can be checked in healthcheck handler
What I also did is
def on_connection_closed(self, *args, **kwargs):
sys.exit(1)
async def init(self) -> None:
import settings
logger.info("Initializing AMQP handler")
config = settings.BaseMessageBrokerSettings
conn = await aiormq.connect(
config.get_dsn(),
loop=asyncio.get_event_loop(),
)
channel = await conn.channel()
conn.closing.add_done_callback(self.on_connection_closed)
channel.closing.add_done_callback(self.on_connection_closed)
await channel.exchange_declare(
exchange=config.EXCHANGE_NAME,
exchange_type=config.EXCHANGE_TYPE,
auto_delete=config.EXCHANGE_AUTO_DELETE,
durable=True,
)
for key in config.BINDING_KEYS:
q_name = (
f"{key}.{config.PREFIX_BINDING_KEYS}"
if config.PREFIX_BINDING_KEYS
else key
)
queue = await channel.queue_declare(queue=q_name, durable=True)
self.queues.append(queue)
await channel.queue_bind(exchange=config.EXCHANGE_NAME, queue=q_name)
await channel.basic_consume(queue.queue, self.handle_message, no_ack=True)
logger.info("Queue declared", extra={"queue": q_name})
self.conn = conn
self.channel = channel
logger.info("AMQP handler initialized")
Your original code uses connect_robust()
, i.e. the function used to create a connection that automatically reconnects...
Sounds like you don't want to use that.
I have the same issue. With connect_robust
if the connection to RabbitMQ is lost, the consumer stops getting messages after re-established.
The logs show the consumer reconnecting every 5secs, Rabbit becomes reachable again, the logs stop and no new messages show up. In the UI RabbitMQ has 0 consumer for that queue
.
I'm using the exact code from here (Master/Worker) but with a dict instead of task_id
: https://aio-pika.readthedocs.io/en/latest/patterns.html
@mosquito Any idea?
I'm using the aio-pika 9.4.0
with Python 3.10
.
This might be fixed by #622 in 9.4.1. It sounds similar enough to what I encountered debugging #615.
We're currently using aio_pika-9.4.2 and encountered the same issue with connect_robust
in our production env, as @gaby. Once RabbitMQ becomes reachable again, the client is stuck on await self.__connection_close_event.wait() .
@yaelmi3 I fixed my issue using this https://github.com/mosquito/aio-pika/issues/231#issuecomment-2023072289
@yaelmi3 I have the same issue, connection stuck after RabbitMQ becames available again.
I had code like
self.connection = await aio_pika.connect_robust(config.RABBIT_URI)
and then declaring a queue and etc.
I added timeout argument to connect_robust function, and it works!
self.connection = await aio_pika.connect_robust(config.RABBIT_URI, timeout=5)
Now after rabbit becames alive I have TimeourError (seems that reconnect task stucks, if rabbit becomes alive during the reconnection process). And the next reconnection try has a success.
@RomaLash : Actually I did try adding timeout to the connect_robust
func, but this did not resolve my issue, in case of Unexpected connection close from remote
. I assume this is because the the client is stuck on Waiting for connection close event, in case of abrupt server shutdown and then it being started again.
My solution was changing the code in the following manner, inspired by this from @gaby
Before:
connection = await self._get_connection()
queue = await self._get_queue(connection=connection, queue_name=queue_name,
qos_prefetch_count=qos_prefetch_count, set_qos=set_qos)
await queue.consume(partial(self._handle_message, message_handler, error_handler, fail_strategy))
future = Future()
future.add_done_callback(lambda _: connection.close())
await future
After:
connection = await self._get_connection()
queue = await self._get_queue(connection=connection, queue_name=queue_name,
qos_prefetch_count=qos_prefetch_count, set_qos=set_qos)
await queue.consume(partial(self._handle_message, message_handler, error_handler, fail_strategy))
try:
await Future()
finally:
await queue.close()
await connection.close()
- in both cases, the connect_robust remained the same
async def _get_connection(self):
return await aio_pika.connect_robust(
login=self.username,
password=self.password,
host=self.host,
port=self.port,
virtualhost=self.vhost,
timeout=self.timeout
)