taskiq icon indicating copy to clipboard operation
taskiq copied to clipboard

No longer able to register methods in TaskIQ 0.11.16

Open TheTechromancer opened this issue 8 months ago • 19 comments

After updating to 0.11.16, registering a method as a task produces an AttributeError:

bbot_server/applets/_base.py:234: in register_watchdog_tasks
    task = broker.register_task(method, **kwargs)
../../../.cache/pypoetry/virtualenvs/bbot-server-DmHQwTaK-py3.13/lib/python3.13/site-packages/taskiq/abc/broker.py:377: in register_task
    return self.task(task_name=task_name, **labels)(func)
../../../.cache/pypoetry/virtualenvs/bbot-server-DmHQwTaK-py3.13/lib/python3.13/site-packages/taskiq/abc/broker.py:331: in inner
    self.decorator_class(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = AsyncTaskiqDecoratedTask(bbot_server.applets.events:archive_events_task), broker = <taskiq_nats.broker.NatsBroker object at 0x70a16466cec0>
task_name = 'bbot_server.applets.events:archive_events_task'
original_func = <bound method EventsApplet.archive_events_task of <bbot_server.applets.events.EventsApplet object at 0x70a16609c050>>, labels = {'schedule': [{'cron': '0 0 * * *'}]}

    def __init__(
        self,
        broker: "AsyncBroker",
        task_name: str,
        original_func: Callable[_FuncParams, _ReturnType],
        labels: Dict[str, Any],
    ) -> None:
        self.broker = broker
        self.task_name = task_name
        self.original_func = original_func
        self.labels = labels
    
        # This is a hack to make ProcessPoolExecutor work
        # with decorated functions.
        #
        # The problem is that when we decorate a function
        # it becomes a new class. This class has the same
        # name as the original function.
        #
        # When receiver sends original function to another
        # process, it will have the same name as the decorated
        # class. This will cause an error, because ProcessPoolExecutor
        # uses `__name__` and `__qualname__` attributes to
        # import functions from other processes and then it verifies
        # that the function is the same as the original one.
        #
        # This hack renames the original function and injects
        # it back to the module where it was defined.
        # This way ProcessPoolExecutor will be able to import
        # the function by it's name and verify its correctness.
        new_name = f"{original_func.__name__}__taskiq_original"
>       self.original_func.__name__ = new_name
E       AttributeError: 'method' object has no attribute '__name__' and no __dict__ for setting new attributes. Did you mean: '__ne__'?

../../../.cache/pypoetry/virtualenvs/bbot-server-DmHQwTaK-py3.13/lib/python3.13/site-packages/taskiq/decor.py:79: AttributeError

TheTechromancer avatar Mar 31 '25 17:03 TheTechromancer

Can you pelase show how you declare tasks?

s3rius avatar Mar 31 '25 17:03 s3rius

It seems like you mark your class methods as tasks. I tried this and it worked.

import asyncio
import time

from taskiq_redis import RedisStreamBroker

broker = RedisStreamBroker("redis://localhost")


class TaskClass:
    class Inner:
        @broker.task()
        @staticmethod
        def extensive_cpu_task(n):
            time.sleep(2)
            s = sum(i for i in range(n))
            print("Done")
            return s


async def main():
    await broker.startup()

    task = await TaskClass.Inner.extensive_cpu_task.kiq(1000000)
    await task.wait_result()

    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

I wonder how do you declare tasks?

s3rius avatar Mar 31 '25 17:03 s3rius

Sure, here is the relevant code:


class BaseApplet:
    async def register_watchdog_tasks(self, broker):
        # register watchdog tasks
        methods = {name: member for name, member in inspect.getmembers(self) if callable(member)}
        for method_name, method in methods.items():
            # handle case where tasks have already been registered
            method = getattr(method, "original_func", method)

            _watchdog_task = getattr(method, "_watchdog_task", None)
            if _watchdog_task is None:
                continue
            kwargs = getattr(method, "_kwargs", {})
            # crontab handling
            cron_default = kwargs.pop("cron", None)
            cron_config_key = kwargs.pop("cron_config_key", None)
            if cron_config_key is not None:
                if cron_default is None:
                    raise ValueError(
                        f"{self.name}.{method_name}: When specifying a crontab config value, you must also give a default crontab value (kwarg: 'cron')"
                    )
                cron = OmegaConf.select(self.config, cron_config_key, default=cron_default)
                kwargs["schedule"] = [{"cron": cron}]
            elif cron_default is not None:
                kwargs["schedule"] = [{"cron": cron_default}]
            self.log.debug(f"Registering task: {method_name} {kwargs}")
            task = broker.register_task(method, **kwargs)
            # overwrite the original method with the decorated TaskIQ task
            setattr(self, method_name, task)

Basically, we are iterating over our class, and registering certain methods with the TaskIQ broker.

TheTechromancer avatar Mar 31 '25 17:03 TheTechromancer

Can you please show me how you use it? Because it seems like something is purging out __name__ attribute of methods.

s3rius avatar Mar 31 '25 18:03 s3rius

I tried this snippet and seems like all methods have __name__ by default.

    for name, met in inspect.getmembers(TaskClass):
        if not callable(met):
            continue
        print(name)
        print(met.__name__)

Which led me to an idea that something is removing __name__ attribute of a function. Also, what got my attention is message that some method doesn't have __name__. But by default methods are typed as functions.

s3rius avatar Mar 31 '25 18:03 s3rius

That is really strange. I'm not sure how they're losing their __name__ attribute. Yeah wtf. I'll look into this some more tomorrow.

TheTechromancer avatar Mar 31 '25 22:03 TheTechromancer

Okay, I tried printing method.__name__ before it's passed into broker.register_task(), and at least at that point, it's still accessible. It seems to lose it somewhere between broker.register_task() and self.decorator_class().

TheTechromancer avatar Apr 01 '25 13:04 TheTechromancer

Seems related, introduced in 0.11.15: I'm not able to add a callable object as task anymore

import asyncio
from dataclasses import dataclass

from taskiq_redis import RedisStreamBroker

broker = RedisStreamBroker("redis://localhost")

@dataclass(frozen=True, kw_only=True)
class MyTask:
    async def __call__(self, *args, **kwargs):
        await asyncio.sleep(2)


async def main():
    broker.task(task_name="somthing")(MyTask())

if __name__ == "__main__":
    asyncio.run(main())

This works with 0.11.14 starting 0.11.15, produces the same error AttributeError: 'MyTask' object has no attribute '__name__'. Did you mean: '__ne__'?

lindycoder avatar Apr 01 '25 17:04 lindycoder

I don't think it should have worked in the first place. The tasks were intended to be functions, and currently, classes and objects are not officially supported.

We can add support for them, but it requires some considerations, such as how to handle self. Should we be able to mark methods as tasks?

Also, it's unclear to me how this code works because, on the worker side, the task isn't declared.

s3rius avatar Apr 01 '25 18:04 s3rius

@lindycoder, can you please share with me MRE of the codebase that works on previous version and doesn't work since the 0.11.16.

Because the code you sent doesn't work on verison 0.11.15.

Here's what I got.

[2025-04-01 20:33:21,189][taskiq.receiver.receiver][WARNING][worker-0] task "somthing" is not found. Maybe you forgot to import it?

s3rius avatar Apr 01 '25 18:04 s3rius

@s3rius maybe you misread

he wrote his code snippet should work upto and including version 0.11.14

scott-boost avatar Apr 01 '25 18:04 scott-boost

@scott-boost, yeah, thanks for noticing. My bad. I was running it on 0.11.14. It doesn't work on 0.11.15 because of __name__ and doesn't work on 0.11.14 because tasks are not available to the worker.

s3rius avatar Apr 01 '25 18:04 s3rius

@s3rius This is all part of a bigger framework we have, I just wanted to share that we currently have a use case where we pass in a "callable", even though it's not a function. I didn't expect my example to work at all, just to not raise.

lindycoder avatar Apr 01 '25 19:04 lindycoder

@lindycoder, okay, clear point. Can you describe the framework you have a little bit so I can re-create something similar to test out my solutions?

s3rius avatar Apr 01 '25 19:04 s3rius

Here's high level what happens. That script will print hello on 0.11.14 and fail to just create the task on 0.11.15

import asyncio
from collections.abc import AsyncGenerator
from dataclasses import dataclass

from taskiq import AsyncBroker

class MyBroker(AsyncBroker):
    async def kick(self, message) -> None:
        task = self.find_task(message.task_name)
        await task.original_func()

    async def listen(self) -> AsyncGenerator[bytes, None]:
        if False:
            yield b""

broker = MyBroker()

@dataclass(frozen=True, kw_only=True)
class MyTask:
    async def __call__(self, *args, **kwargs):
        print("Hello!")

async def main():
    my_task = broker.task(task_name="something")(MyTask())
    await broker.startup()
    try:
        t = await my_task.kiq()
        await t.wait_result(timeout=2)
    finally:
        await broker.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

Hope this can help

lindycoder avatar Apr 01 '25 20:04 lindycoder

Also experiencing this, it is blocking us from being able to upgrade TaskIQ.

josiah-lunit avatar Jul 04 '25 10:07 josiah-lunit

@TheTechromancer i have wrote class-based task. It require custom receiver with little change.

Wirker with custom receiver starts with --receiver. Example: $ worker main:broker --receiver receiver:TaskReceiver

task.py

import dataclasses
import inspect
import sys
from typing import get_type_hints

from taskiq import AsyncBroker
from taskiq import AsyncTaskiqDecoratedTask
from taskiq import TaskiqMessage


class TaskDecorator(AsyncTaskiqDecoratedTask):
    def __init__(self, task: type["Task"], broker: "AsyncBroker"):
        module_name_to_patch = task.__module__
        func_name_to_patch = f"{task.run.__name__}__taskiq_original"
        if getattr(sys.modules[module_name_to_patch], func_name_to_patch, None):
            raise NameError(
                f"Taskiq needs to monkey-patch `{module_name_to_patch}.{func_name_to_patch}`. "
                "Please rename the function to avoid conflicts."
            )

        super().__init__(
            broker=broker,
            task_name=task.get_name(),
            original_func=task,
            labels=task.get_labels(),
            return_type=get_type_hints(task.run).get("return"),
        )

        self.task = task

    def __call__(self, *args, **kwargs):
        message = TaskiqMessage(
            task_id="",
            task_name=self.task_name,
            labels=self.labels,
            labels_types=None,
            args=list(args),
            kwargs=kwargs,
        )

        return self.task(self.broker, message).run(*args, **kwargs)


class Task:
    name: str = ""

    class Meta:
        # labels
        ...

    def __init__(self, broker: "AsyncBroker", message: "TaskiqMessage"):
        self.broker = broker
        self.message = message

        self.meta = self.make_meta()

    @classmethod
    def get_labels(cls) -> dict:
        return dataclasses.asdict(cls.make_meta())

    @classmethod
    def get_name(cls) -> str:
        return cls.name or f"{cls.__module__}.{cls.__name__}"

    @classmethod
    def make_meta(cls):
        metas = []
        fields = []
        for task in inspect.getmro(cls):
            if hasattr(task, "Meta") and task.Meta not in metas:
                metas.append(task.Meta)
                fields.extend(field for field in task.Meta.__dict__ if not field.startswith("__"))

        return dataclasses.make_dataclass(cls_name="Meta", fields=fields, bases=tuple(metas), init=False)()

    @classmethod
    def register(cls, broker: "AsyncBroker"):
        task_name = cls.get_name()
        task = TaskDecorator(cls, broker=broker)
        broker._register_task(task_name, task)
        return task

    def run(self, *args, **kwargs):
        raise NotImplementedError()

receiver.py

from typing import Any
from typing import Callable
from typing import Union

from taskiq import TaskiqMessage
from taskiq import TaskiqResult
from taskiq.receiver import Receiver

from task import Task


class TaskReceiver(Receiver):
    def _prepare_task(self, name: str, handler: Union[Callable[..., Any], type["Task"]]) -> None:
        if isinstance(handler, type) and issubclass(handler, Task):
            # warn: self is first params, but in `taskiq.receiver.params_parser.parse_params` will be skipped
            handler = handler.run

        super()._prepare_task(name, handler)

    async def run_task(  # noqa: C901, PLR0912, PLR0915
        self,
        target: Union[Callable[..., Any], type[Task]],
        message: TaskiqMessage,
    ) -> TaskiqResult[Any]:
        print("run_task")
        if isinstance(target, type) and issubclass(target, Task):
            target = target(self.broker, message).run
        return await super().run_task(target, message)

main.py

import asyncio

from taskiq import InMemoryBroker
from taskiq import SimpleRetryMiddleware

from receiver import TaskReceiver
from task import Task

broker = InMemoryBroker().with_middlewares(
    SimpleRetryMiddleware(default_retry_count=3),
)


class MyTask(Task):
    class Meta:
        retry_on_error = True

    async def run(self, variable: int):
        print(f"{variable=}")


my_task = MyTask.register(broker)


async def main():
    await my_task(1)
    task = await my_task.kiq(2)
    await task.wait_result()


if __name__ == "__main__":
    broker.receiver = TaskReceiver(
        broker=broker,
        executor=broker.executor,
        validate_params=broker.receiver.validate_params,
        max_async_tasks=broker.receiver.sem._value,
        propagate_exceptions=broker.receiver.propagate_exceptions,
    )  # runtime patch due no any another options to change receiver
    asyncio.run(main())

kai-nashi avatar Aug 13 '25 13:08 kai-nashi

@s3rius, class-based tasks are awesome — but before making a PR, I think we should discuss two problems.

  1. Class-based model for all tasks What if all tasks were classes, with some backward compatibility to support the @broker.task decorator?

  2. Labels handling If all tasks are classes, and labels are set via a Meta (or Labels, as it’s currently called), we might want an interface like self.meta.retry_on_error. That’s fine as long as retry_on_error is explicitly configured.

    However, labels are just a dictionary — keys might not be valid Python identifiers, and any middleware can set arbitrary labels. It would be great to have a mechanism for middleware to read class Meta and update instance Meta before the target method is called.

I think this could be achieved with a small update to side libraries, for example:

class Middleware:
    ...
    def set_task_meta(self, task: Task) -> None:
        task.meta.middleware_property_value = task.message.labels.get(
            "middleware_property_value",
            self.middleware_property_value_default
        )
    ...

If the meta is not updated by middleware due method set_task_meta not exists, that middleware labels simply wouldn’t be available via the .meta interface. (check method exists before call set_task_meta)

kai-nashi avatar Aug 13 '25 13:08 kai-nashi

I have the same problem with register_task() method.

Packages:

taskiq                  0.11.20    
taskiq-dependencies     1.5.7 
taskiq-fastapi          0.3.5 
taskiq-pipelines        0.1.4  
taskiq-redis            1.1.2 

Example:

@worker.task
async def disable_settings() -> None:
    """ """

    service: MetricSettingsTasksInterface = MetricSettingsTasksInterface()

    async with service:
        await service.metric_settings.taskiq_disable_ended_settings(end_date=date.today())

    logger.info("Set status OLD successfully")


@worker.task
async def activate_settings() -> None:
    """ """

    service: MetricSettingsTasksInterface = MetricSettingsTasksInterface()

    async with service:
        await service.metric_settings.taskiq_activate_settings(start_date=date.today())

    logger.info("Set status ACTIVE successfully")


pipe_settings: Pipeline = Pipeline(broker=worker, task=disable_settings).call_after(activate_settings)

worker.register_task(pipe_settings.kiq, task_name="settings_workflow", schedule=[{"cron": "1 0 * * *"}])

Error:

worker.register_task(pipe_settings.kiq, task_name="settings_workflow", schedule=[{"cron": "1 0 * * *"}])
    ~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/casper/.cache/pypoetry/virtualenvs/loser-service--bCb3ki4-py3.13/lib/python3.13/site-packages/taskiq/abc/broker.py", line 385, in register_task
    return self.task(task_name=task_name, **labels)(func)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "/home/casper/.cache/pypoetry/virtualenvs/loser-service--bCb3ki4-py3.13/lib/python3.13/site-packages/taskiq/abc/broker.py", line 338, in inner
    self.decorator_class(
    ~~~~~~~~~~~~~~~~~~~~^
        broker=self,
        ^^^^^^^^^^^^
    ...<3 lines>...
        return_type=return_type,  # type: ignore
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    ),
    ^
  File "/home/casper/.cache/pypoetry/virtualenvs/loser-service--bCb3ki4-py3.13/lib/python3.13/site-packages/taskiq/decor.py", line 83, in __init__
    self.original_func.__name__ = new_name
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'method' object has no attribute '__name__' and no __dict__ for setting new attributes. Did you mean: '__ne__'?

But in taskiq 0.11.14 everything works fine.

casper-71 avatar Nov 06 '25 17:11 casper-71