dramatiq icon indicating copy to clipboard operation
dramatiq copied to clipboard

Actors with at-most-once delivery

Open dp-alvarez opened this issue 1 year ago • 0 comments

I need at-most-once delivery for my project, which is not supported by Dramatiq by default. Even with retries set to zero, a message will be retried if a worker goes offline before acking/nacking it.

I implemented this simple Middleware to ack messages before work on them starts. In the shape it is now it causes a double ack/nack on finished messages which is harmless (apart from a warning when using RabbitMQ) and doesn't generate dead-letters.

I'm posting it here in case you want to add it to the cookbook docs, should someone else need at-most-once delivery, If you are interested in merging something similar I could work on a more robust solution.

from contextlib import suppress
from time import sleep
from dramatiq import Middleware, ActorNotFound, get_broker, actor
from dramatiq.worker import _WorkerMiddleware

class AtMostOnce(Middleware):
    def __init__(self):
        self.worker = None

    @property
    def actor_options(self):
        return {"at_most_once"}

    def before_process_message(self, broker, message):
        with suppress(ActorNotFound):
            actor = broker.get_actor(message.actor_name)
            if actor.options.get("at_most_once", False):
                self.worker.consumers[message.queue_name].post_process_message(message)

    def before_worker_boot(self, broker, worker):
        self.worker = worker


get_broker().add_middleware(AtMostOnce())

@actor(max_retries=0, at_most_once=True)
def stall():
    print(f"stalling...")
    sleep(30)
    print(f"done stalling")

stall.send()

dp-alvarez avatar Jul 08 '22 11:07 dp-alvarez