dramatiq
dramatiq copied to clipboard
Actors with at-most-once delivery
I need at-most-once delivery for my project, which is not supported by Dramatiq by default. Even with retries set to zero, a message will be retried if a worker goes offline before acking/nacking it.
I implemented this simple Middleware to ack messages before work on them starts. In the shape it is now it causes a double ack/nack on finished messages which is harmless (apart from a warning when using RabbitMQ) and doesn't generate dead-letters.
I'm posting it here in case you want to add it to the cookbook docs, should someone else need at-most-once delivery, If you are interested in merging something similar I could work on a more robust solution.
from contextlib import suppress
from time import sleep
from dramatiq import Middleware, ActorNotFound, get_broker, actor
from dramatiq.worker import _WorkerMiddleware
class AtMostOnce(Middleware):
def __init__(self):
self.worker = None
@property
def actor_options(self):
return {"at_most_once"}
def before_process_message(self, broker, message):
with suppress(ActorNotFound):
actor = broker.get_actor(message.actor_name)
if actor.options.get("at_most_once", False):
self.worker.consumers[message.queue_name].post_process_message(message)
def before_worker_boot(self, broker, worker):
self.worker = worker
get_broker().add_middleware(AtMostOnce())
@actor(max_retries=0, at_most_once=True)
def stall():
print(f"stalling...")
sleep(30)
print(f"done stalling")
stall.send()