dramatiq_sqs icon indicating copy to clipboard operation
dramatiq_sqs copied to clipboard

DLQ - Help wanted

Open jairhenrique opened this issue 4 years ago • 12 comments

@Bogdanp I've created an actor like this.

@dramatiq.actor(
    max_retries=10,
    min_backoff=(1 * MINUTES),
    max_backoff=(10 * MINUTES),
    throws=(FooBar, BarFoo),
    queue_name="my_queue",
)
def process_finish(reseller_id: str):
    ...

I have an error in this process and the worker exceed the number of retries [dramatiq.middleware.retries.Retries] [WARNING] Retries exceeded for message 'd3c95e20-40b6-47ce-877b-fff43bcd5a57'. but the message did not go to the dlq queue 🤔 .

The number of MAX_RECEIVES default is 5 in dlq setup.

Environment

python = 3.8.5
django-dramatiq = 0.9.1
dramatiq = 1.9.0
dramatiq-sqs =0.0.11

Configs

DRAMATIQ_BROKER = {
    "BROKER": "dramatiq_sqs.broker.SQSBroker",
    "OPTIONS": {
        "namespace": "my_namespace",
        "dead_letter": True,
        "region_name": os.getenv("AWS_REGION_NAME", "fake"),
        "aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID", "fake"),
        "aws_secret_access_key": os.getenv("AWS_SECRET_ACCESS_KEY", "fake"),
    },
    "MIDDLEWARE": [
        "dramatiq.middleware.AgeLimit",
        "dramatiq.middleware.TimeLimit",
        "dramatiq.middleware.Callbacks",
        "dramatiq.middleware.Retries",
        "my_project.middleware.dramatiq.log_message.LogMessageMiddleware",
    ],
}

jairhenrique avatar Aug 14 '20 17:08 jairhenrique

Hi @pjsier! I saw that you did the implementation of dlq. Is this working for you?

jairhenrique avatar Aug 19 '20 17:08 jairhenrique

I think I found out what happens. The retry middleware adds more information to the message, including the error traceback. Probably sqs does not count as the same message because the message body is different.

jairhenrique avatar Aug 19 '20 17:08 jairhenrique

@Bogdanp @pjsier

When I have success or fail on process of a message, the message is droped from queue.

    def ack(self, message: "_SQSMessage") -> None:
        message._sqs_message.delete()
        self.message_refc -= 1

    #: Messages are added to DLQ by SQS redrive policy, so no actions are necessary
    nack = ack

The receive count used for dlq on sqs will never work.

SQS has three states of message (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html):

An Amazon SQS message has three basic states:
   1. Sent to a queue by a producer.
   2. Received from the queue by a consumer. (receive count increase if message is not dropped after visibility timeout period)
   3. Deleted from the queue.

So, when I delete a message on nack and enqueue a new message incrementing the number of retries, this is a new message with a new receive count.

Maybe the solution is on nack method check if a queue that belongs a message has a dlq configuration and do not delete a message. Maybe retry middleware can't be used with a dlq.

What is your opinion?

jairhenrique avatar Aug 19 '20 19:08 jairhenrique

Thanks for digging into this! Unfortunately, I haven't had time to look into this, but I'll try to take a look this weekend.

Bogdanp avatar Aug 20 '20 09:08 Bogdanp

Checking in on this. I'm looking to implement the SQS broker, but we use the Retry middleware pretty heavily

liveFreeOrCode avatar Mar 03 '21 22:03 liveFreeOrCode

@jairhenrique were you able to solve this issue? Have you considered not using Retries and instead : don't delete the message, let it regain visibility, and be retrieved by a worker again? I haven't tried retries with Dramatiq yet but I have the impression that since, as you write, Retry mechanism in practice creates a new message, SQS DLQ makes no sense if Retry is turned on at all. So we have a situation here: either we use DLQ or Retries. Am I getting it right?

januszm avatar Jul 11 '21 17:07 januszm

@januszm 's I tried using Retry along with the following nack updates

def nack(self, message: "_SQSMessage") -> None:
        self.message_refc -= 1

which does few following things

  • If I set retry > 0, Retry will keep creating new message and deleting old ones and eventually stops creating new messages after hitting max_retries ( I have kept it to 0 and let visibility timeout expire)
  • Consumer will continue consuming the last retried message till max_receive number of times ( sqs redrive policy )
  • Redrive policy kicks in and message is moved to dlq.

Note: Default MAX_VISIBILITY_TIMEOUT for subsequent receive_message() on sqs calls is set too high(7200 sec), you might want to reconfigure that as well.

chintan-synapse avatar Mar 03 '22 23:03 chintan-synapse

@januszm @chintan-synapse I try it:

import argparse
import dramatiq
import dramatiq
import random
import sys

from dramatiq_sqs import SQSBroker

broker = SQSBroker(
    namespace="dramatiq_sqs_tests",
    middleware=[],
    dead_letter=True,
    aws_access_key_id="xxxx",
    aws_secret_access_key="xxx",
    region_name="us-east-1"

)
dramatiq.set_broker(broker)

@dramatiq.actor
def add(x, y):
    raise Exception('fake')

And rewrite _SQSConsumer with:

    def ack(self, message: "_SQSMessage") -> None:        
        message._sqs_message.delete()
        self.message_refc -= 1

    #: Messages are added to DLQ by SQS redrive policy, so no actions are necessary
    def nack(self, message: "_SQSMessage") -> None:
        print('NACK')
        pass

Whem I raise Exception, ack is called and the message was deleted by message._sqs_message.delete().

I'm trying to figure out the best way to not ack the message and not call .delete()

jairhenrique avatar Apr 10 '23 20:04 jairhenrique

@januszm @chintan-synapse @Bogdanp I open this pr on dramatiq.

If it make sense, I will change _SqsConsumer to:

    def ack(self, message: "_SQSMessage") -> None:        
        message._sqs_message.delete()
        self.message_refc -= 1

    def nack(self, message: "_SQSMessage") -> None:
        self.message_refc -= 1

jairhenrique avatar Apr 10 '23 22:04 jairhenrique

With this implementation, the Retries middleware will doesn't work with sqs :(

    def ack(self, message: "_SQSMessage") -> None:        
        message._sqs_message.delete()
        self.message_refc -= 1

    def nack(self, message: "_SQSMessage") -> None:
        self.message_refc -= 1

I keep thinking of a solution.

jairhenrique avatar Apr 11 '23 10:04 jairhenrique

Hi Guys, We use dramatiq with SQS and do see a value in retries + DLQ - that way there is more certainty that the messages that will enter the DLQ are real bugs and not rumtime problems... Do you know to tell if there is a solution that can be provided?

shayts7 avatar Oct 29 '23 14:10 shayts7

We've got a workaround by forcing max_received to be 1 and overriding nack.

    def nack(self, message: _SQSMessage) -> None:
        """
        Set visibility timeout so SQS immediately tries to read the message.
        `max_receives` are assumed to be 1, so that the message will
        enter DLQ after one single `nack`.

        Retries can now be handled by `Retries` middleware in as opposed to sqs.
        """

        message._sqs_message.change_visibility(VisibilityTimeout=0)

jamie-chang-globality avatar Feb 05 '24 12:02 jamie-chang-globality