Feature: add a TailMessage middleware
This middleware is motivated by the following three observations:
- The Dramatiq pipeline requires existing Message objects; however, there are cases where a Message object can’t be created before the pipeline runs.
- An Actor needs to create one or more messages during its execution, and send those messages only on success, and ideally after the Actor’s message was ack’ed and other committing decorators/middlewares have exited.
- Sometimes, an Actor needs to re-run indefinitely, see also this discussion.[^1]
I’d like to propose a TailMessage middleware (similar to the CurrentMessage middleware) like so:
class TailMessage(dramatiq.Middleware):
STATE = threading.local()
@classmethod
def enqueue(cls, message, delay = None):
tail_message_list = getattr(cls.STATE, "tail_message_list") # Check: can this fail in a valid setup?
tail_message_list.append((message, delay))
def before_process_message(self, broker, message):
setattr(self.STATE, "tail_message_list", [])
def after_ack(self, broker, message):
tail_message_list = getattr(self.STATE, "tail_message_list") # Check: This shouldn't fail.
for msg, delay in tail_message_list:
broker.enqueue(msg, delay=delay)
~(Perhaps improve the middleware to fit more closely with the message_with_options() approach.)~ The middleware follows closely the existing Broker.enqueue() interface, so it should be simple enough to understand.
The idea is to give an Actor a way to buffer messages until the Actor’s message has been ack’ed, and only then send all buffered messages to the queue.
@Bogdanp — what do you think? Happy to provide a feature PR if this looks like a useful, general middleware for Dramatiq.
[^1]: Alternatively, one might be able to refactor the pipeline class such that it’s able to handle true generators and then pass itertools.cycle() to produce the same message infinitely.