dramatiq icon indicating copy to clipboard operation
dramatiq copied to clipboard

Feature: add a TailMessage middleware

Open jenstroeger opened this issue 2 years ago • 0 comments

This middleware is motivated by the following three observations:

  • The Dramatiq pipeline requires existing Message objects; however, there are cases where a Message object can’t be created before the pipeline runs.
  • An Actor needs to create one or more messages during its execution, and send those messages only on success, and ideally after the Actor’s message was ack’ed and other committing decorators/middlewares have exited.
  • Sometimes, an Actor needs to re-run indefinitely, see also this discussion.[^1]

I’d like to propose a TailMessage middleware (similar to the CurrentMessage middleware) like so:

class TailMessage(dramatiq.Middleware):

    STATE = threading.local()

    @classmethod
    def enqueue(cls, message, delay = None):
        tail_message_list = getattr(cls.STATE, "tail_message_list")  # Check: can this fail in a valid setup?
        tail_message_list.append((message, delay))

    def before_process_message(self, broker, message):
        setattr(self.STATE, "tail_message_list", [])

    def after_ack(self, broker, message):
        tail_message_list = getattr(self.STATE, "tail_message_list")  # Check: This shouldn't fail.
        for msg, delay in tail_message_list:
            broker.enqueue(msg, delay=delay)

~(Perhaps improve the middleware to fit more closely with the message_with_options() approach.)~ The middleware follows closely the existing Broker.enqueue() interface, so it should be simple enough to understand.

The idea is to give an Actor a way to buffer messages until the Actor’s message has been ack’ed, and only then send all buffered messages to the queue.

@Bogdanp — what do you think? Happy to provide a feature PR if this looks like a useful, general middleware for Dramatiq.

[^1]: Alternatively, one might be able to refactor the pipeline class such that it’s able to handle true generators and then pass itertools.cycle() to produce the same message infinitely.

jenstroeger avatar Aug 19 '23 22:08 jenstroeger