taskiq icon indicating copy to clipboard operation
taskiq copied to clipboard

Add messages rejection

Open vasilbekk opened this issue 1 year ago • 3 comments
trafficstars

Thank you so much for creating such a cool library, but I found a problem.

rabbitmq has support for dead messages, and taskiq-aio-pika even creates a queue for them.

But in the Receiver implementation in tasker, there is only message.ack(), and there is no option for message.reject()

And in Receiver.callback(), ack() is always called even if the task failed with an error

I rewrote the class a little bit, and added the optional reject() callable field to AckableMessage (I called RejectableMessage), also rewrote the broker so that in listen() it returns RejectableMessage instead of AckableMessage, and rewrote `Receiver.callback() so that in the case of result.is_err it calls reject(), and not ack()

I could clean up a bit and send a PR with message rejection implementation (especially useful in rabbitmq) if needed.

It would also be interesting to get an answer to 2 questions

  1. Why is the creation of a queue for dead messages implemented in taskiq-aio-pika, but it is essentially always empty, was it laid down for the future or broke?

  2. Perhaps there is another implementation of repeating error messages and I do not see it, I would be glad to hear suggestions

vasilbekk avatar Apr 30 '24 12:04 vasilbekk

Hello, Sergey! I've encountered the same problem. I would rather expect sending failing tasks to the dead letter queue rather than ack'ing them. Could you please share a draft PR or a gist? I'm pretty sure it might help many people including me.

a1d4r avatar Jul 02 '24 15:07 a1d4r

Actually, we might have removed this queue. It's here for historical reasons from old versions. But I agree that we might implement reject to all brokers. Can you please publish your changes? And we will go through it.

s3rius avatar Jul 06 '24 14:07 s3rius

Hi, I'm thinking about the same rejection/retry on failed execution. For my use case I use SQS as the broker, the SQS approach is I need to explicitly delete the message receipt handle on success, otherwise SQS makes the message visible again for execution after timeout. Therefore the best rejection/retry is to do nothing on error.

Is it possible to pass result to AckableMessage.ack()? This way broker can check whether result is error to decide how to ack.

xg-wang avatar Oct 25 '24 04:10 xg-wang