rele
rele copied to clipboard
Handling unhandled exceptions
Using pub/sub out of the box, when an exception is raised in a subscription, it is caught and a nack()
is executed, so the message is sent again, over and over again, until something is done.
I was trying Relé locally and realized that when an exception was raised, I wasn't getting my message again, so I started digging in, and saw this part of the code:
try:
res = self._subscription(data, **dict(message.attributes))
except Exception as e:
run_middleware_hook('post_process_message_failure',
self._subscription, e, start_time)
else:
message.ack()
run_middleware_hook('post_process_message_success',
self._subscription, start_time)
return res
finally:
run_middleware_hook('post_process_message')
So instead of the exception getting to pubsub's client, it is being caught by Relé, and not being raised again. Also, it's not manually doing a nack()
nor an ack()
. The final result, because an ack()
is never executed, is that you eventually get the message again, but the why is quite strange.
This is a good point @sanntt . I can not remember, when we were developing this, any reasons to NOT reraise the exception during the call back. Currently we are only logging the exception.
The current implementation diverts from expected behavior from the normal google pubsub way of doing things. Which is to say that a message is neither ack'd nor nack'd. In my opinion, we should not be doing that, unless absolutely necessary.
This is a conversation that is closely related to having some sort of dead letter queue since the dlq would be able to absorb these failing tasks in a more graceful way.
If anybody has an idea for a dead letter queue implementation, I think we are open to ideas in this regard.
I actually like the current implementation as delegates the responsibility of the message acknowledgement and error handling to the middleware which I find neater.
For example, I have some errors that are unrecoverable (instead of sending to a dead letter queue) I raise an unrecoverable error:
# middleware.py
class UnrecoverableException(Exception):
pass
class UnrecoverableMiddleWare(BaseMiddleware):
def post_process_message_failure(self, subscription, err, start_time, message):
if isinstance(err, UnrecoverableException):
message.ack()
# subscriptions.py
@sub(topic='my-topic')
def handle_webhook(data, **kwargs):
try:
parse_message(data)
catch Exception as e:
# I can't even parse the message pointless to retry
raise UnrecoverableException()
Using middleware also opens up some other interesting abstractions like retry X times. Which is middleware acks the current message and sends a clone of it with an incremented retry_count
attribute.
That's a great point @hobochild !
@sanntt @andrewgy8 I don't think nack
ing is the "normal way of doing things". From the documentation:
if you ack only upon completion, then Pub/Sub will eventually re-deliver the unacknowledged message. The basic point of Pub/Sub's at-least-once delivery is that a message will be re-delivered until it's acknowledged, based on the ack deadline.
When you nack, it tells Pub/Sub that you are unable or unwilling to deal with the message, and that the service should redeliver it.
You only nack
when you don't want to deal with the message, which is a very particular case.
When you have an unexpected problem, like a temporary connection error to an API, and you nack, you will only generate a cascade of errors if the situation has not changed, because the message will be immediately re-delivered. If instead you don't do anything, the message will get re-delivered after a convenient deadline, which defaults to 10 seconds to avoid huge increases of delivery rates due to such errors.
For more detailed explanation, the concepts are explained here
In short: the current design of Relé takes advantage of this on purpose.
Hey there. It's me again.
Recently we realized that we were having a problem when raising an exception in a subscription. For business reasons, some messages need to be rejected until a later time, which may be seconds later. However, if a controlled exception is raised (nor ack()
or nack()
is executed), the same message is received after more or less one hour. We would need to greatly reduce this time.
Following what @hobochild suggested, we are creating a middleware where we are going to nack()
messages, which will be received shortly after.
In an ideal world, we would have a way to configure how long after that we will be receiving a message but I've found no way of doing so.