taskiq icon indicating copy to clipboard operation
taskiq copied to clipboard

`AsyncTaskiqDecoratedTask().kicker().with_labels()` mutates task labels

Open donc310 opened this issue 1 year ago • 2 comments
trafficstars

Hello and thank you very much for your work on this project.

I currently have an issue where tasks submitted with custom labels mutates the underlying decorated task instance

@pytest.mark.anyio
async def test_with_label() -> None:
    trace_ids = {}

    class Tracer(TaskiqMiddleware):
        def pre_send(self, message: TaskiqMessage) -> TaskiqMessage | Coroutine[Any, Any, TaskiqMessage]:
            nonlocal trace_ids
            trace_ids[tuple(message.args)] = message.labels.get("trace_id")
            return super().pre_send(message)

    broker = InMemoryBroker().with_middlewares(Tracer())

    @broker.task()
    def run_task(a) -> int:
        return a

    task1 = await run_task.kicker().with_labels(trace_id="11111").kiq(1)
    task2 = await run_task.kicker().with_labels(trace_id="22222").kiq(2)
    task3 = await run_task.kiq(3)

    await task1.wait_result(timeout=1)
    await task2.wait_result(timeout=1)
    await task3.wait_result(timeout=1)

    assert trace_ids == {(1,): "11111", (2,): "22222", (3,): None}

For some context on what we were trying to achieve;

We were building a custom Sentry Integration for TaskIq that would add tracing meta-data as labels to queued messages, to achieve this we patched AsyncKicker.kiq and Receiver.run_task methods to add tracing info to message labels which would track tasks from when there were submitted to when a worker picks up and processes the message.

The above test case fails because after the second task is submitted all other subsequent tasks will have a trcae_id of 22222.

Our current workaround is to use a custom task class which doesn't mutate the original task labels

class CustomTask(AsyncTaskiqDecoratedTask):

    def kicker(self) -> AsyncKicker[_FuncParams, _ReturnType]:
        """
        This function returns kicker object.

        Kicker is a object that can modify kiq request
        before sending it.

        :return: AsyncKicker instance.
        """
        return AsyncKicker(
            task_name=self.task_name,
            broker=self.broker,
            labels=deepcopy(self.labels),
        )


broker.decorator_class = CustomTask

donc310 avatar Mar 01 '24 20:03 donc310

You're completely right. If you want to become a contributor, you can create a PR that fixes it. I haven't experienced this problem yet, but I think it's a possible bug.

Thanks for noticing. If you don't want to create a PR, I can create a patch myself.

s3rius avatar Mar 08 '24 20:03 s3rius

Will make PR for the fix.

donc310 avatar Mar 08 '24 20:03 donc310