taskiq
taskiq copied to clipboard
Add max_attempts_at_message
This addresses the issue https://github.com/taskiq-python/taskiq-aio-pika/issues/35.
Codecov Report
All modified and coverable lines are covered by tests :white_check_mark:
Project coverage is 77.65%. Comparing base (
fec9633) to head (0737287). Report is 1 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #395 +/- ##
==========================================
+ Coverage 77.51% 77.65% +0.14%
==========================================
Files 62 62
Lines 1899 1911 +12
==========================================
+ Hits 1472 1484 +12
Misses 427 427
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
I don't think this PR should be implemented this way and affect base classes such as AckableMessage. Also in here I don't see anywhere how you set this redeliver information to a message. If you expect brokers to do it, then I refuse to merge it, because Broker API should remain simple to implement.
I think that it will be much easier for you to implement such functionality as a middleware and add it in taskiq.middlewares.
Thanks for the reply.
I don't see anywhere how you set this redeliver information to a message.
As my issue explains, RabbitMQ does this by itself: if a message that was delivered isn't acked but the connection with the consuming client closes, RabbitMQ requeues the message to the front of the queue and sets a x-delivery-count header.
base classes such as AckableMessage
The issue currently is that message headers, including the one talked about above, aren't being passed to the message the receiver works with. They need to be put somewhere, either into the TaskiqMessage or its wrapper.
implement such functionality as a middleware
The middleware would still need access to the message headers. It would also need access to the message's ack callback, and the pre_execute method would need to be able to break the current processing (raising an exception in it doesn't seem viable at first glance).
Also bear in mind there is a separate PR on the taskiq-aio-pika side that deals with RabbitMQ's settings and propagating the header: taskiq-python/taskiq-aio-pika/pull/37.