taskiq
taskiq copied to clipboard
`AsyncTaskiqDecoratedTask().kicker().with_labels()` mutates task labels
Hello and thank you very much for your work on this project.
I currently have an issue where tasks submitted with custom labels mutates the underlying decorated task instance
@pytest.mark.anyio
async def test_with_label() -> None:
trace_ids = {}
class Tracer(TaskiqMiddleware):
def pre_send(self, message: TaskiqMessage) -> TaskiqMessage | Coroutine[Any, Any, TaskiqMessage]:
nonlocal trace_ids
trace_ids[tuple(message.args)] = message.labels.get("trace_id")
return super().pre_send(message)
broker = InMemoryBroker().with_middlewares(Tracer())
@broker.task()
def run_task(a) -> int:
return a
task1 = await run_task.kicker().with_labels(trace_id="11111").kiq(1)
task2 = await run_task.kicker().with_labels(trace_id="22222").kiq(2)
task3 = await run_task.kiq(3)
await task1.wait_result(timeout=1)
await task2.wait_result(timeout=1)
await task3.wait_result(timeout=1)
assert trace_ids == {(1,): "11111", (2,): "22222", (3,): None}
For some context on what we were trying to achieve;
We were building a custom Sentry Integration for TaskIq that would add tracing meta-data as labels to queued messages, to achieve this we patched AsyncKicker.kiq and Receiver.run_task methods to add tracing info to message labels which would track tasks from when there were submitted to when a worker picks up and processes the message.
The above test case fails because after the second task is submitted all other subsequent tasks will have a trcae_id of 22222.
Our current workaround is to use a custom task class which doesn't mutate the original task labels
class CustomTask(AsyncTaskiqDecoratedTask):
def kicker(self) -> AsyncKicker[_FuncParams, _ReturnType]:
"""
This function returns kicker object.
Kicker is a object that can modify kiq request
before sending it.
:return: AsyncKicker instance.
"""
return AsyncKicker(
task_name=self.task_name,
broker=self.broker,
labels=deepcopy(self.labels),
)
broker.decorator_class = CustomTask
You're completely right. If you want to become a contributor, you can create a PR that fixes it. I haven't experienced this problem yet, but I think it's a possible bug.
Thanks for noticing. If you don't want to create a PR, I can create a patch myself.
Will make PR for the fix.