channels_rabbitmq
channels_rabbitmq copied to clipboard
Missing Connection Retry after Connection Loss
We have an HA RabbitMQ cluster, where we testing the HA functionality. Our application structure is like following:
- Several consumers, which consume messages from a defined exchange (we have to use a fan out a strategy because there are several identical distributed consumers): Consumer Implementation
async def handle_message(self, message: bytes) -> None:
# Here I process the messages...
await get_channel_layer().group_send(
"websocket", {"type": "websocket.message", "message": json.loads(message)}
)
def _random_letters(self, n: int) -> str:
return "".join(random.choice(string.ascii_letters) for i in range(n))
async def process_messages(self):
channel_layer = get_channel_layer()
carehare_connection = await channel_layer.carehare_connection
self.queue_name = f"changes_{self._random_letters(12)}"
await carehare_connection.exchange_declare(
exchange_name=self.exchange, exchange_type="fanout"
)
await carehare_connection.queue_declare(
queue_name=self.queue_name,
durable=True,
arguments={"x-queue-type": "quorum", "x-expires": 5},
)
await carehare_connection.queue_bind(
exchange_name=self.exchange, queue_name=self.queue_name
)
self.logger.info(f"Connected to queue {self.queue_name}: ")
async with carehare_connection.acking_consumer(self.queue_name) as consumer:
async for message in consumer:
await self.handle_message(message)
def handle(self, *args, **options):
self.exchange = options.get("exchange") or "test"
asyncio.run(self.process_messages())
- WebSocket Consumer, which receive messages from the consumers:
- Django Channels Websocket Consumer Implementation
class UpdateTopologyConsumer(JsonWebsocketConsumer):
def connect(self):
async_to_sync(self.channel_layer.group_add)("websocket", self.channel_name)
self.accept()
def websocket_message(self, message):
print(message, flush=True)
self.send_json(message)
def disconnect(self, close_code):
self.close()
Now, we face the problem, that if one RabbitMQ node in the cluster goes down, the application breaks: Consumer Error
backend> python manage.py listenonupdates changes -v 3
2021-03-12 12:31:19,713 | INFO | Connect to RabbitMQ and subscribe to exchange: changes
2021-03-12 12:31:19,888 | INFO | Connected to queue changes_PHnjRResdlDD:
2021-03-12 12:33:08,772 | INFO | Message received:
2021-03-12 12:33:08,772 | INFO | b'{"bla": "bla bla"}'
Disconnected from RabbitMQ: RabbitMQ closed the connection: 320 CONNECTION_FORCED - Node was put into maintenance mode. Will reconnect.
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/channels_rabbitmq/core.py", line 263, in _reconnect_forever
await connection.closed
carehare._exceptions.ConnectionClosedByServer: RabbitMQ closed the connection: 320 CONNECTION_FORCED - Node was put into maintenance mode
Closing consumer
Traceback (most recent call last):
File "/usr/src/app/backend/updatetopology/management/commands/listenonupdates.py", line 68, in process_messages
async for message in consumer:
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 74, in __anext__
message, self._yielded_delivery_tag = await _next_delivery(
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 50, in _next_delivery
closed.result() # raise exception if there is one
File "/usr/local/lib/python3.9/site-packages/channels_rabbitmq/reader.py", line 37, in consume_into_multi_queue_until_connection_close
multi_queue.put_nowait(
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 223, in __aexit__
await self.closed
File "/usr/local/lib/python3.9/site-packages/channels_rabbitmq/reader.py", line 32, in consume_into_multi_queue_until_connection_close
body, delivery_tag = await consumer.next_delivery()
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 196, in next_delivery
return await _next_delivery(self._queue, self.closed)
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 50, in _next_delivery
closed.result() # raise exception if there is one
carehare._exceptions.ConnectionClosed
Traceback (most recent call last):
File "/usr/src/app/backend/manage.py", line 22, in <module>
main()
File "/usr/src/app/backend/manage.py", line 18, in main
execute_from_command_line(sys.argv)
File "/usr/local/lib/python3.9/site-packages/django/core/management/__init__.py", line 401, in execute_from_command_line
utility.execute()
File "/usr/local/lib/python3.9/site-packages/django/core/management/__init__.py", line 395, in execute
self.fetch_command(subcommand).run_from_argv(self.argv)
File "/usr/local/lib/python3.9/site-packages/django/core/management/base.py", line 330, in run_from_argv
self.execute(*args, **cmd_options)
File "/usr/local/lib/python3.9/site-packages/django/core/management/base.py", line 371, in execute
output = self.handle(*args, **options)
File "/usr/src/app/backend/updatetopology/management/commands/listenonupdates.py", line 77, in handle
asyncio.run(self.process_messages())
File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/usr/src/app/backend/updatetopology/management/commands/listenonupdates.py", line 69, in process_messages
await self.handle_message(message)
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 223, in __aexit__
await self.closed
File "/usr/src/app/backend/updatetopology/management/commands/listenonupdates.py", line 68, in process_messages
async for message in consumer:
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 74, in __anext__
message, self._yielded_delivery_tag = await _next_delivery(
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 50, in _next_delivery
closed.result() # raise exception if there is one
File "/usr/local/lib/python3.9/site-packages/channels_rabbitmq/reader.py", line 37, in consume_into_multi_queue_until_connection_close
multi_queue.put_nowait(
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 223, in __aexit__
await self.closed
File "/usr/local/lib/python3.9/site-packages/channels_rabbitmq/reader.py", line 32, in consume_into_multi_queue_until_connection_close
body, delivery_tag = await consumer.next_delivery()
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 196, in next_delivery
return await _next_delivery(self._queue, self.closed)
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 50, in _next_delivery
closed.result() # raise exception if there is one
carehare._exceptions.ConnectionClosed
In the Django Channels consumer, we most possibly have the same problem: Django Channels Consumer error
Disconnected from RabbitMQ: RabbitMQ closed the connection: 320 CONNECTION_FORCED - Node was put into maintenance mode. Will reconnect.
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/channels_rabbitmq/core.py", line 263, in _reconnect_forever
await connection.closed
carehare._exceptions.ConnectionClosedByServer: RabbitMQ closed the connection: 320 CONNECTION_FORCED - Node was put into maintenance mode
We like to reconnect automatically after a connection loss, how do you solve this problem?
Update: I saw that the Django Channels consumer recovers from the error.
Disconnected from RabbitMQ: RabbitMQ closed the connection: 320 CONNECTION_FORCED - Node was put into maintenance mode. Will reconnect.
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/channels_rabbitmq/core.py", line 263, in _reconnect_forever
await connection.closed
carehare._exceptions.ConnectionClosedByServer: RabbitMQ closed the connection: 320 CONNECTION_FORCED - Node was put into maintenance mode
{'type': 'websocket.message', 'message': {'bla': 'bla'''}}
{'type': 'websocket.message', 'message': {'bla': 'bla'''}}
{'type': 'websocket.message', 'message': {'bla': 'bla'''}}
So it seems the problem is just with the own-written carehare connection...
Good question. Indeed, I got as frustrated as you about this very problem. That's why I wrote carehare in the first place.
The very-simplest logic is to ensure the whole app dies when the RabbitMQ connection fails. That way, in production, your service can spin up again and resume where it left off. At CJWorkbench, our background services do this.
You're asking for a way to reconnect your RabbitMQ consumer's carehare connection without crashing your app. I can think of two solid approaches.
Approach 1: a separate carehare connection for the consumer. Some pieces:
async def connect_with_retry(
url: str,
*,
retries: int = 10,
backoff_delay: float = 2.0,
connect_timeout: float = 10.0,
stop_retrying: Optional[asyncio.Future[None]] = None, # cancellation logic
) -> carehare.Connection:
"""Connect to RabbitMQ, retrying if needed and failing in case of disaster.
The caller should eventually await `retval.close()`.
The caller can pass a `stop_retrying` Future. Then, if the caller calls its
`.set_result(None)`, connection failures will be reported much more quickly.
"""
next_delay = 0.0
for retry in range(retries):
try:
connection = carehare.Connection(url, connect_timeout=connect_timeout)
await connection.connect()
return connection
except (ConnectionError, asyncio.TimeoutError) as err:
if retry >= retries - 1 or (
stop_retrying is not None and stop_retrying.done()
):
raise
else:
logger.warn(
"Failed to connect to RabbitMQ (%s); retrying in %fs",
str(err),
next_delay,
)
await asyncio.sleep(next_delay) # TODO stop_retrying short-circuit
next_delay += backoff_delay
async def process_messages_with_reconnect(stop: asyncio.Future[None]) -> None:
while not stop.done():
logger.info("Connecting to RabbitMQ")
connection = await connect_with_retry(
settings.RABBITMQ_HOST, stop_retrying=stop
) # raises ConnectionError, asyncio.TimeoutError; these should crash our program
try:
await process_messages(connection) # TODO pass `stop` for a shutdown mechanism
except (
ConnectionError,
carehare.ConnectionClosedByServer,
carehare.ConnectionClosedByHeartbeatMonitor,
) as err:
logger.exception("Abnormal disconnect from RabbitMQ: %s; reconnecting", str(err))
finally:
try:
await connection.close()
except Exception as err:
logger.exception("Ignoring error cleaning up RabbitMQ connection: %s", str(err))
You can use your existing process_messages() here, except it must accept carehare_connection as an argument.
With this strategy, process_messages_with_reconnect() will handle one RabbitMQ connection, and channels_rabbitmq will handle another. Each will reconnect when needed. They might even connect to different servers. This should be fine in many (or all?) use cases.
We use this on our production Django servers, in our queueing code. Job queues use a global-variable carehare connection, and Channels use channels_rabbitmq.
Another idea -- untested -- is to piggy-back on channels_rabbitmq's retry mechanism. Something like this:
async def process_messages_with_reconnect() -> None:
while True:
connection = await get_channel_layer().carehare_connection # the latest connection -- or raise
happy_loop = asyncio.create_task(process_messages(connection))
await asyncio.wait({connection.closed, happy_loop}, return_when=asyncio.ALL_COMPLETED)
try:
happy_loop.result()
except (
ConnectionError,
carehare.ConnectionClosedByServer,
carehare.ConnectionClosedByHeartbeatMonitor,
) as err:
logger.exception("Abnormal disconnect from RabbitMQ: %s; reconnecting", str(err))
# there might need to be a small pause here; not sure.
Here's the idea: channels_rabbitmq already runs a task akin to process_messages_with_reconnect() above. When that task detects a disconnect, it replaces the layer's .carehare_connection with a fresh Future. The idea is await get_channel_layer().carehare_connection should never give a stale connection. (There may be a small race, though, in which case you'll want to await asyncio.sleep(0) or some-such before looping. I'm not sure: I haven't tested.)
Neither solution brings me joy.
Django Channels is heavily influenced by its (flawed) Redis strategy. Its patterns are questionable. For us RabbitMQ users, the only value it provides is de-duplicating group_send() messages so RabbitMQ only sends only message per Django server instead of one message per Websockets connection. I always feel like I'm fighting Channels every step of the way. (For instance, Daphne leaks group subscriptions by default when connection-close logic is slow.)
We decided to use the first approach. First I had the following error:
# Here I restarted the RabbitMQ master node
Closing consumer
Traceback (most recent call last):
File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 53, in process_messages
async for message in consumer:
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 74, in __anext__
message, self._yielded_delivery_tag = await _next_delivery(
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 50, in _next_delivery
closed.result() # raise exception if there is one
carehare._exceptions.ConnectionClosed
2021-03-24 10:54:14,012 | ERROR | Ignoring error cleaning up RabbitMQ connection: RabbitMQ closed the connection: 320 CONNECTION_FORCED - Node was put into maintenance mode
Traceback (most recent call last):
File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 101, in process_messages_with_reconnect
await self.process_messages(carehare_connection)
File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 54, in process_messages
await self.handle_message(message)
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 223, in __aexit__
await self.closed
File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 53, in process_messages
async for message in consumer:
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 74, in __anext__
message, self._yielded_delivery_tag = await _next_delivery(
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 50, in _next_delivery
closed.result() # raise exception if there is one
carehare._exceptions.ConnectionClosed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 113, in process_messages_with_reconnect
await carehare_connection.close()
File "/usr/local/lib/python3.9/site-packages/carehare/_connection.py", line 89, in close
await self.closed
carehare._exceptions.ConnectionClosedByServer: RabbitMQ closed the connection: 320 CONNECTION_FORCED - Node was put into maintenance mode
Closing consumer
Traceback (most recent call last):
File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 101, in process_messages_with_reconnect
await self.process_messages(carehare_connection)
File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 54, in process_messages
await self.handle_message(message)
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 223, in __aexit__
await self.closed
File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 53, in process_messages
async for message in consumer:
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 74, in __anext__
message, self._yielded_delivery_tag = await _next_delivery(
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 50, in _next_delivery
closed.result() # raise exception if there is one
carehare._exceptions.ConnectionClosed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/channels_rabbitmq/reader.py", line 32, in consume_into_multi_queue_until_connection_close
body, delivery_tag = await consumer.next_delivery()
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 196, in next_delivery
return await _next_delivery(self._queue, self.closed)
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 39, in _next_delivery
done, pending = await asyncio.wait(
File "/usr/local/lib/python3.9/asyncio/tasks.py", line 413, in wait
return await _wait(fs, timeout, return_when, loop)
File "/usr/local/lib/python3.9/asyncio/tasks.py", line 529, in _wait
await waiter
asyncio.exceptions.CancelledError
Traceback (most recent call last):
File "/usr/src/app/manage.py", line 22, in <module>
main()
File "/usr/src/app/manage.py", line 18, in main
execute_from_command_line(sys.argv)
File "/usr/local/lib/python3.9/site-packages/django/core/management/__init__.py", line 401, in execute_from_command_line
utility.execute()
File "/usr/local/lib/python3.9/site-packages/django/core/management/__init__.py", line 395, in execute
self.fetch_command(subcommand).run_from_argv(self.argv)
File "/usr/local/lib/python3.9/site-packages/django/core/management/base.py", line 330, in run_from_argv
self.execute(*args, **cmd_options)
File "/usr/local/lib/python3.9/site-packages/django/core/management/base.py", line 371, in execute
output = self.handle(*args, **options)
File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 122, in handle
asyncio.run(self.process_messages_with_reconnect(asyncio.Future()))
File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 101, in process_messages_with_reconnect
await self.process_messages(carehare_connection)
File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 54, in process_messages
await self.handle_message(message)
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 223, in __aexit__
await self.closed
File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 53, in process_messages
async for message in consumer:
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 74, in __anext__
message, self._yielded_delivery_tag = await _next_delivery(
File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 50, in _next_delivery
closed.result() # raise exception if there is one
carehare._exceptions.ConnectionClosed
I simply added the carehare.ConnectionClosed to the except statement:
async def process_messages_with_reconnect(self, stop: asyncio.Future[None]) -> None:
while not stop.done():
self.logger.info(
f"Connect to RabbitMQ and subscribe to queue: {self.queue_name}"
)
carehare_connection = await self.connect_with_retry(stop_retrying=stop)
# raises ConnectionError, asyncio.TimeoutError; these should crash our program
try:
await self.process_messages(carehare_connection)
except (
ConnectionError,
carehare.ConnectionClosedByServer,
carehare.ConnectionClosedByHeartbeatMonitor,
carehare.ConnectionClosed,
) as err:
self.logger.exception(
"Abnormal disconnect from RabbitMQ: %s; reconnecting", str(err)
)
finally:
try:
await carehare_connection.close()
except Exception as err:
self.logger.exception(
"Ignoring error cleaning up RabbitMQ connection: %s", str(err)
)
Afterward, it worked with automatic retries :+1: Thanks for your help.
PS: One question - is it fine to call the method with a new asyncio.Future()?
asyncio.run(self.process_messages_with_reconnect(asyncio.Future()))
In which case does stop.done() become effective?
D'oh, you indeed made the right fix.
self.process_messages() actually won't raise ConnectionError, ConnectionClosedByServer or ConnectionClosedByHeartbeatMonitor. It will only raise ConnectionClosed (which you should catch! -- and you did, thus fixing your problem) and ChannelClosedByServer (which you probably shouldn't catch -- it usually indicates an error message from RabbitMQ that you wouldn't expect).
As for the "stop" future: you can indeed pass a new Future ... or you can nix stop out of all my code. stop isn't for the problem at hand; it solves another problem you'll run into someday. Sooner or later you'll probably want graceful shutdowns. The Channels-recommended server, Daphne, has no mechanism for it: killing the web server disconnects everybody. Uvicorn implements "ASGI lifespan". The gist: your lifespan can supply a done and set stop.set_result(None) and then await the result of connect_with_retry(). It's the trickiest piece of the tricky problem of maintaining servers (and deploying) without breaking your users' experience.
But stop has nothing to do with reconnects. I wrote it because I'm a pedant: I wouldn't write a function that never returns :). If you don't want to implement shutdown, you don't need stop; it has no relation to this bug report.
Thank you for all your enthusiasm. I'm glad the restarts are working for you now.
I'll close this issue after I copy/paste some of this code into the README.