dramatiq icon indicating copy to clipboard operation
dramatiq copied to clipboard

Result middleware should read message options first

Open huwylphimet opened this issue 7 months ago • 1 comments

Issues

Checklist

  • [x] Does your title concisely summarize the problem?
  • [x] Did you include a minimal, reproducible example?
  • [x] What OS are you using?
  • [x] What version of Dramatiq are you using?
  • [x] What did you do?
  • [x] What did you expect would happen?
  • [x] What happened?

What OS are you using?

macOS 13.5.2

What version of Dramatiq are you using?

1.15.0

What did you do?

implemented an actor with the Result middleware and passing the actor options (store_results and result_ttl) within the message sent

What did you expect would happen?

  1. I would expect that the result_ttl equals the value from the option stored within the message
  2. I would expect that the warning created in after_process_message() uses the store_results option from the message

What happened?

  1. the result_ttl within the message option was ignored and only the option value from the actor and Result middleware instance or DEFAULT_RESULT_TTL = 600000 was used
  2. the following warning was created even if store_results was set to False within the message options

Actor '%s' returned a value that is not None, but you haven't set its store_results' option to True' so the value has been discarded." % message.actor_name

Example

import dramatiq

from dramatiq.brokers.redis import RedisBroker
from dramatiq.results.backends import RedisBackend
from dramatiq.results import Results, ResultTimeout, ResultMissing
from dramatiq.middleware import CurrentMessage

broker = RedisBroker()
result_backend = RedisBackend(client=broker.client)
broker.add_middleware(Results(backend=result_backend))
broker.add_middleware(CurrentMessage())
dramatiq.set_broker(broker)


@dramatiq.actor(store_results=True)
def add(x, y):
    message = CurrentMessage.get_current_message()
    store_results = message.options.get("store_results", True)
    print(f"message 'store_results' option is '{store_results}'")
    return x + y if store_results else None


if __name__ == "__main__":
    message = add.send_with_options(
        args=(1, 2),
        store_results=False)
    try:
        print(message.get_result(backend=result_backend, block=True, timeout=5000))
    except (ResultTimeout, ResultMissing):
        print("result not stored")

Fix proposition

a simple fix could be to slightly modify _lookup_options() in dramatiq/results/middleware.py on lines 86-87

def _lookup_options(self, broker, message):
    try:
        actor = broker.get_actor(message.actor_name)
        store_results = actor.options.get("store_results", self.store_results)  #line 86
        result_ttl = actor.options.get("result_ttl", self.result_ttl)  #line 87
        return store_results, result_ttl
    except ActorNotFound:
        return False, 0

replace lines 86-87 with

store_results = message.options.get("store_results", actor.options.get("store_results", self.store_results))
result_ttl = message.options.get("result_ttl", actor.options.get("result_ttl", self.result_ttl))

huwylphimet avatar Nov 21 '23 10:11 huwylphimet

A new version has been published since the creation of this issue but I have not yet received any feedback regarding my proposal. Therefore I've created the following PR accordingly: #612 Please consider merging my PR or commenting on this issue. Thanks

huwylphimet avatar Feb 21 '24 08:02 huwylphimet

This issue has been fixed by merging the PR #612 and has been released with v1.17.0

huwylphimet avatar May 13 '24 13:05 huwylphimet