dramatiq
dramatiq copied to clipboard
Result middleware should read message options first
Issues
Checklist
- [x] Does your title concisely summarize the problem?
- [x] Did you include a minimal, reproducible example?
- [x] What OS are you using?
- [x] What version of Dramatiq are you using?
- [x] What did you do?
- [x] What did you expect would happen?
- [x] What happened?
What OS are you using?
macOS 13.5.2
What version of Dramatiq are you using?
1.15.0
What did you do?
implemented an actor with the Result middleware and passing the actor options (store_results and result_ttl) within the message sent
What did you expect would happen?
- I would expect that the result_ttl equals the value from the option stored within the message
- I would expect that the warning created in after_process_message() uses the store_results option from the message
What happened?
- the result_ttl within the message option was ignored and only the option value from the actor and Result middleware instance or DEFAULT_RESULT_TTL = 600000 was used
- the following warning was created even if store_results was set to False within the message options
Actor '%s' returned a value that is not None, but you haven't set its
store_results' option to
True' so the value has been discarded." % message.actor_name
Example
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results.backends import RedisBackend
from dramatiq.results import Results, ResultTimeout, ResultMissing
from dramatiq.middleware import CurrentMessage
broker = RedisBroker()
result_backend = RedisBackend(client=broker.client)
broker.add_middleware(Results(backend=result_backend))
broker.add_middleware(CurrentMessage())
dramatiq.set_broker(broker)
@dramatiq.actor(store_results=True)
def add(x, y):
message = CurrentMessage.get_current_message()
store_results = message.options.get("store_results", True)
print(f"message 'store_results' option is '{store_results}'")
return x + y if store_results else None
if __name__ == "__main__":
message = add.send_with_options(
args=(1, 2),
store_results=False)
try:
print(message.get_result(backend=result_backend, block=True, timeout=5000))
except (ResultTimeout, ResultMissing):
print("result not stored")
Fix proposition
a simple fix could be to slightly modify _lookup_options() in dramatiq/results/middleware.py on lines 86-87
def _lookup_options(self, broker, message):
try:
actor = broker.get_actor(message.actor_name)
store_results = actor.options.get("store_results", self.store_results) #line 86
result_ttl = actor.options.get("result_ttl", self.result_ttl) #line 87
return store_results, result_ttl
except ActorNotFound:
return False, 0
replace lines 86-87 with
store_results = message.options.get("store_results", actor.options.get("store_results", self.store_results))
result_ttl = message.options.get("result_ttl", actor.options.get("result_ttl", self.result_ttl))
A new version has been published since the creation of this issue but I have not yet received any feedback regarding my proposal. Therefore I've created the following PR accordingly: #612 Please consider merging my PR or commenting on this issue. Thanks
This issue has been fixed by merging the PR #612 and has been released with v1.17.0