dramatiq_sqs
dramatiq_sqs copied to clipboard
DLQ - Help wanted
@Bogdanp I've created an actor like this.
@dramatiq.actor(
max_retries=10,
min_backoff=(1 * MINUTES),
max_backoff=(10 * MINUTES),
throws=(FooBar, BarFoo),
queue_name="my_queue",
)
def process_finish(reseller_id: str):
...
I have an error in this process and the worker exceed the number of retries [dramatiq.middleware.retries.Retries] [WARNING] Retries exceeded for message 'd3c95e20-40b6-47ce-877b-fff43bcd5a57'.
but the message did not go to the dlq queue 🤔 .
The number of MAX_RECEIVES
default is 5 in dlq setup.
Environment
python = 3.8.5
django-dramatiq = 0.9.1
dramatiq = 1.9.0
dramatiq-sqs =0.0.11
Configs
DRAMATIQ_BROKER = {
"BROKER": "dramatiq_sqs.broker.SQSBroker",
"OPTIONS": {
"namespace": "my_namespace",
"dead_letter": True,
"region_name": os.getenv("AWS_REGION_NAME", "fake"),
"aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID", "fake"),
"aws_secret_access_key": os.getenv("AWS_SECRET_ACCESS_KEY", "fake"),
},
"MIDDLEWARE": [
"dramatiq.middleware.AgeLimit",
"dramatiq.middleware.TimeLimit",
"dramatiq.middleware.Callbacks",
"dramatiq.middleware.Retries",
"my_project.middleware.dramatiq.log_message.LogMessageMiddleware",
],
}
Hi @pjsier! I saw that you did the implementation of dlq. Is this working for you?
I think I found out what happens. The retry middleware adds more information to the message, including the error traceback. Probably sqs does not count as the same message because the message body is different.
@Bogdanp @pjsier
When I have success or fail on process of a message, the message is droped from queue.
def ack(self, message: "_SQSMessage") -> None:
message._sqs_message.delete()
self.message_refc -= 1
#: Messages are added to DLQ by SQS redrive policy, so no actions are necessary
nack = ack
The receive count
used for dlq on sqs
will never work.
SQS has three states of message (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html):
An Amazon SQS message has three basic states:
1. Sent to a queue by a producer.
2. Received from the queue by a consumer. (receive count increase if message is not dropped after visibility timeout period)
3. Deleted from the queue.
So, when I delete a message on nack
and enqueue
a new message incrementing the number of retries, this is a new message with a new receive count.
Maybe the solution is on nack
method check if a queue that belongs a message has a dlq
configuration and do not delete a message. Maybe retry middleware
can't be used with a dlq
.
What is your opinion?
Thanks for digging into this! Unfortunately, I haven't had time to look into this, but I'll try to take a look this weekend.
Checking in on this. I'm looking to implement the SQS broker, but we use the Retry middleware pretty heavily
@jairhenrique were you able to solve this issue? Have you considered not using Retries and instead : don't delete the message, let it regain visibility, and be retrieved by a worker again? I haven't tried retries with Dramatiq yet but I have the impression that since, as you write, Retry mechanism in practice creates a new message, SQS DLQ makes no sense if Retry is turned on at all. So we have a situation here: either we use DLQ or Retries. Am I getting it right?
@januszm 's I tried using Retry along with the following nack
updates
def nack(self, message: "_SQSMessage") -> None:
self.message_refc -= 1
which does few following things
- If I set retry > 0, Retry will keep creating new message and deleting old ones and eventually stops creating new messages after hitting max_retries ( I have kept it to 0 and let visibility timeout expire)
- Consumer will continue consuming the last retried message till
max_receive
number of times ( sqs redrive policy ) - Redrive policy kicks in and message is moved to dlq.
Note: Default MAX_VISIBILITY_TIMEOUT
for subsequent receive_message()
on sqs calls is set too high(7200 sec), you might want to reconfigure that as well.
@januszm @chintan-synapse I try it:
import argparse
import dramatiq
import dramatiq
import random
import sys
from dramatiq_sqs import SQSBroker
broker = SQSBroker(
namespace="dramatiq_sqs_tests",
middleware=[],
dead_letter=True,
aws_access_key_id="xxxx",
aws_secret_access_key="xxx",
region_name="us-east-1"
)
dramatiq.set_broker(broker)
@dramatiq.actor
def add(x, y):
raise Exception('fake')
And rewrite _SQSConsumer
with:
def ack(self, message: "_SQSMessage") -> None:
message._sqs_message.delete()
self.message_refc -= 1
#: Messages are added to DLQ by SQS redrive policy, so no actions are necessary
def nack(self, message: "_SQSMessage") -> None:
print('NACK')
pass
Whem I raise Exception
, ack
is called and the message was deleted by message._sqs_message.delete()
.
I'm trying to figure out the best way to not ack the message and not call .delete()
@januszm @chintan-synapse @Bogdanp I open this pr on dramatiq.
If it make sense, I will change _SqsConsumer
to:
def ack(self, message: "_SQSMessage") -> None:
message._sqs_message.delete()
self.message_refc -= 1
def nack(self, message: "_SQSMessage") -> None:
self.message_refc -= 1
With this implementation, the Retries
middleware will doesn't work with sqs
:(
def ack(self, message: "_SQSMessage") -> None:
message._sqs_message.delete()
self.message_refc -= 1
def nack(self, message: "_SQSMessage") -> None:
self.message_refc -= 1
I keep thinking of a solution.
Hi Guys, We use dramatiq with SQS and do see a value in retries + DLQ - that way there is more certainty that the messages that will enter the DLQ are real bugs and not rumtime problems... Do you know to tell if there is a solution that can be provided?
We've got a workaround by forcing max_received to be 1 and overriding nack.
def nack(self, message: _SQSMessage) -> None:
"""
Set visibility timeout so SQS immediately tries to read the message.
`max_receives` are assumed to be 1, so that the message will
enter DLQ after one single `nack`.
Retries can now be handled by `Retries` middleware in as opposed to sqs.
"""
message._sqs_message.change_visibility(VisibilityTimeout=0)