taskiq icon indicating copy to clipboard operation
taskiq copied to clipboard

Run `pre_send` and `post_send` middlewares in `Context.requeue`

Open ACherryJam opened this issue 1 month ago • 0 comments

Currently Context.requeue sends the message directly to the broker and doesn't trigger any middlewares. Is this an intended behaviour? If not, I'd like to make a PR that uses AsyncKicker similarly to how SimpleRetryMiddleware does.

async def requeue(self) -> None:
    requeue_count = int(self.message.labels.get("X-Taskiq-requeue", 0))
    requeue_count += 1
    self.message.labels["X-Taskiq-requeue"] = str(requeue_count)
    await self.broker.kick(self.broker.formatter.dumps(self.message))
    raise NoResultError

Rationale for such change would be finalization of objects added to labels in pre_execute hook. For example, dishka's TaskIQ integration breaks Context.requeue because the DI-container is in message's labels. Removing the container in pre_send hook would solve the issue.

  File "C:\Users\Cherry\dishka\tests\integrations\taskiq\test_taskiq.py", line 104, in context_requeue_task
    await context.requeue()
  File "C:\Users\Cherry\dishka\.nox\taskiq_0110\Lib\site-packages\taskiq\context.py", line 33, in requeue
    await self.broker.kick(self.broker.formatter.dumps(self.message))
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
...
  File "C:\Users\Cherry\.pyenv\pyenv-win\versions\3.12.8\Lib\json\encoder.py", line 180, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type AsyncContainer is not JSON serializable

ACherryJam avatar Nov 01 '25 15:11 ACherryJam