No longer able to register methods in TaskIQ 0.11.16
After updating to 0.11.16, registering a method as a task produces an AttributeError:
bbot_server/applets/_base.py:234: in register_watchdog_tasks
task = broker.register_task(method, **kwargs)
../../../.cache/pypoetry/virtualenvs/bbot-server-DmHQwTaK-py3.13/lib/python3.13/site-packages/taskiq/abc/broker.py:377: in register_task
return self.task(task_name=task_name, **labels)(func)
../../../.cache/pypoetry/virtualenvs/bbot-server-DmHQwTaK-py3.13/lib/python3.13/site-packages/taskiq/abc/broker.py:331: in inner
self.decorator_class(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = AsyncTaskiqDecoratedTask(bbot_server.applets.events:archive_events_task), broker = <taskiq_nats.broker.NatsBroker object at 0x70a16466cec0>
task_name = 'bbot_server.applets.events:archive_events_task'
original_func = <bound method EventsApplet.archive_events_task of <bbot_server.applets.events.EventsApplet object at 0x70a16609c050>>, labels = {'schedule': [{'cron': '0 0 * * *'}]}
def __init__(
self,
broker: "AsyncBroker",
task_name: str,
original_func: Callable[_FuncParams, _ReturnType],
labels: Dict[str, Any],
) -> None:
self.broker = broker
self.task_name = task_name
self.original_func = original_func
self.labels = labels
# This is a hack to make ProcessPoolExecutor work
# with decorated functions.
#
# The problem is that when we decorate a function
# it becomes a new class. This class has the same
# name as the original function.
#
# When receiver sends original function to another
# process, it will have the same name as the decorated
# class. This will cause an error, because ProcessPoolExecutor
# uses `__name__` and `__qualname__` attributes to
# import functions from other processes and then it verifies
# that the function is the same as the original one.
#
# This hack renames the original function and injects
# it back to the module where it was defined.
# This way ProcessPoolExecutor will be able to import
# the function by it's name and verify its correctness.
new_name = f"{original_func.__name__}__taskiq_original"
> self.original_func.__name__ = new_name
E AttributeError: 'method' object has no attribute '__name__' and no __dict__ for setting new attributes. Did you mean: '__ne__'?
../../../.cache/pypoetry/virtualenvs/bbot-server-DmHQwTaK-py3.13/lib/python3.13/site-packages/taskiq/decor.py:79: AttributeError
Can you pelase show how you declare tasks?
It seems like you mark your class methods as tasks. I tried this and it worked.
import asyncio
import time
from taskiq_redis import RedisStreamBroker
broker = RedisStreamBroker("redis://localhost")
class TaskClass:
class Inner:
@broker.task()
@staticmethod
def extensive_cpu_task(n):
time.sleep(2)
s = sum(i for i in range(n))
print("Done")
return s
async def main():
await broker.startup()
task = await TaskClass.Inner.extensive_cpu_task.kiq(1000000)
await task.wait_result()
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
I wonder how do you declare tasks?
Sure, here is the relevant code:
class BaseApplet:
async def register_watchdog_tasks(self, broker):
# register watchdog tasks
methods = {name: member for name, member in inspect.getmembers(self) if callable(member)}
for method_name, method in methods.items():
# handle case where tasks have already been registered
method = getattr(method, "original_func", method)
_watchdog_task = getattr(method, "_watchdog_task", None)
if _watchdog_task is None:
continue
kwargs = getattr(method, "_kwargs", {})
# crontab handling
cron_default = kwargs.pop("cron", None)
cron_config_key = kwargs.pop("cron_config_key", None)
if cron_config_key is not None:
if cron_default is None:
raise ValueError(
f"{self.name}.{method_name}: When specifying a crontab config value, you must also give a default crontab value (kwarg: 'cron')"
)
cron = OmegaConf.select(self.config, cron_config_key, default=cron_default)
kwargs["schedule"] = [{"cron": cron}]
elif cron_default is not None:
kwargs["schedule"] = [{"cron": cron_default}]
self.log.debug(f"Registering task: {method_name} {kwargs}")
task = broker.register_task(method, **kwargs)
# overwrite the original method with the decorated TaskIQ task
setattr(self, method_name, task)
Basically, we are iterating over our class, and registering certain methods with the TaskIQ broker.
Can you please show me how you use it? Because it seems like something is purging out __name__ attribute of methods.
I tried this snippet and seems like all methods have __name__ by default.
for name, met in inspect.getmembers(TaskClass):
if not callable(met):
continue
print(name)
print(met.__name__)
Which led me to an idea that something is removing __name__ attribute of a function. Also, what got my attention is message that some method doesn't have __name__. But by default methods are typed as functions.
That is really strange. I'm not sure how they're losing their __name__ attribute. Yeah wtf. I'll look into this some more tomorrow.
Okay, I tried printing method.__name__ before it's passed into broker.register_task(), and at least at that point, it's still accessible. It seems to lose it somewhere between broker.register_task() and self.decorator_class().
Seems related, introduced in 0.11.15: I'm not able to add a callable object as task anymore
import asyncio
from dataclasses import dataclass
from taskiq_redis import RedisStreamBroker
broker = RedisStreamBroker("redis://localhost")
@dataclass(frozen=True, kw_only=True)
class MyTask:
async def __call__(self, *args, **kwargs):
await asyncio.sleep(2)
async def main():
broker.task(task_name="somthing")(MyTask())
if __name__ == "__main__":
asyncio.run(main())
This works with 0.11.14
starting 0.11.15, produces the same error AttributeError: 'MyTask' object has no attribute '__name__'. Did you mean: '__ne__'?
I don't think it should have worked in the first place. The tasks were intended to be functions, and currently, classes and objects are not officially supported.
We can add support for them, but it requires some considerations, such as how to handle self. Should we be able to mark methods as tasks?
Also, it's unclear to me how this code works because, on the worker side, the task isn't declared.
@lindycoder, can you please share with me MRE of the codebase that works on previous version and doesn't work since the 0.11.16.
Because the code you sent doesn't work on verison 0.11.15.
Here's what I got.
[2025-04-01 20:33:21,189][taskiq.receiver.receiver][WARNING][worker-0] task "somthing" is not found. Maybe you forgot to import it?
@s3rius maybe you misread
he wrote his code snippet should work upto and including version 0.11.14
@scott-boost, yeah, thanks for noticing. My bad. I was running it on 0.11.14. It doesn't work on 0.11.15 because of __name__ and doesn't work on 0.11.14 because tasks are not available to the worker.
@s3rius This is all part of a bigger framework we have, I just wanted to share that we currently have a use case where we pass in a "callable", even though it's not a function. I didn't expect my example to work at all, just to not raise.
@lindycoder, okay, clear point. Can you describe the framework you have a little bit so I can re-create something similar to test out my solutions?
Here's high level what happens. That script will print hello on 0.11.14 and fail to just create the task on 0.11.15
import asyncio
from collections.abc import AsyncGenerator
from dataclasses import dataclass
from taskiq import AsyncBroker
class MyBroker(AsyncBroker):
async def kick(self, message) -> None:
task = self.find_task(message.task_name)
await task.original_func()
async def listen(self) -> AsyncGenerator[bytes, None]:
if False:
yield b""
broker = MyBroker()
@dataclass(frozen=True, kw_only=True)
class MyTask:
async def __call__(self, *args, **kwargs):
print("Hello!")
async def main():
my_task = broker.task(task_name="something")(MyTask())
await broker.startup()
try:
t = await my_task.kiq()
await t.wait_result(timeout=2)
finally:
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Hope this can help
Also experiencing this, it is blocking us from being able to upgrade TaskIQ.
@TheTechromancer i have wrote class-based task. It require custom receiver with little change.
Wirker with custom receiver starts with --receiver. Example:
$ worker main:broker --receiver receiver:TaskReceiver
task.py
import dataclasses
import inspect
import sys
from typing import get_type_hints
from taskiq import AsyncBroker
from taskiq import AsyncTaskiqDecoratedTask
from taskiq import TaskiqMessage
class TaskDecorator(AsyncTaskiqDecoratedTask):
def __init__(self, task: type["Task"], broker: "AsyncBroker"):
module_name_to_patch = task.__module__
func_name_to_patch = f"{task.run.__name__}__taskiq_original"
if getattr(sys.modules[module_name_to_patch], func_name_to_patch, None):
raise NameError(
f"Taskiq needs to monkey-patch `{module_name_to_patch}.{func_name_to_patch}`. "
"Please rename the function to avoid conflicts."
)
super().__init__(
broker=broker,
task_name=task.get_name(),
original_func=task,
labels=task.get_labels(),
return_type=get_type_hints(task.run).get("return"),
)
self.task = task
def __call__(self, *args, **kwargs):
message = TaskiqMessage(
task_id="",
task_name=self.task_name,
labels=self.labels,
labels_types=None,
args=list(args),
kwargs=kwargs,
)
return self.task(self.broker, message).run(*args, **kwargs)
class Task:
name: str = ""
class Meta:
# labels
...
def __init__(self, broker: "AsyncBroker", message: "TaskiqMessage"):
self.broker = broker
self.message = message
self.meta = self.make_meta()
@classmethod
def get_labels(cls) -> dict:
return dataclasses.asdict(cls.make_meta())
@classmethod
def get_name(cls) -> str:
return cls.name or f"{cls.__module__}.{cls.__name__}"
@classmethod
def make_meta(cls):
metas = []
fields = []
for task in inspect.getmro(cls):
if hasattr(task, "Meta") and task.Meta not in metas:
metas.append(task.Meta)
fields.extend(field for field in task.Meta.__dict__ if not field.startswith("__"))
return dataclasses.make_dataclass(cls_name="Meta", fields=fields, bases=tuple(metas), init=False)()
@classmethod
def register(cls, broker: "AsyncBroker"):
task_name = cls.get_name()
task = TaskDecorator(cls, broker=broker)
broker._register_task(task_name, task)
return task
def run(self, *args, **kwargs):
raise NotImplementedError()
receiver.py
from typing import Any
from typing import Callable
from typing import Union
from taskiq import TaskiqMessage
from taskiq import TaskiqResult
from taskiq.receiver import Receiver
from task import Task
class TaskReceiver(Receiver):
def _prepare_task(self, name: str, handler: Union[Callable[..., Any], type["Task"]]) -> None:
if isinstance(handler, type) and issubclass(handler, Task):
# warn: self is first params, but in `taskiq.receiver.params_parser.parse_params` will be skipped
handler = handler.run
super()._prepare_task(name, handler)
async def run_task( # noqa: C901, PLR0912, PLR0915
self,
target: Union[Callable[..., Any], type[Task]],
message: TaskiqMessage,
) -> TaskiqResult[Any]:
print("run_task")
if isinstance(target, type) and issubclass(target, Task):
target = target(self.broker, message).run
return await super().run_task(target, message)
main.py
import asyncio
from taskiq import InMemoryBroker
from taskiq import SimpleRetryMiddleware
from receiver import TaskReceiver
from task import Task
broker = InMemoryBroker().with_middlewares(
SimpleRetryMiddleware(default_retry_count=3),
)
class MyTask(Task):
class Meta:
retry_on_error = True
async def run(self, variable: int):
print(f"{variable=}")
my_task = MyTask.register(broker)
async def main():
await my_task(1)
task = await my_task.kiq(2)
await task.wait_result()
if __name__ == "__main__":
broker.receiver = TaskReceiver(
broker=broker,
executor=broker.executor,
validate_params=broker.receiver.validate_params,
max_async_tasks=broker.receiver.sem._value,
propagate_exceptions=broker.receiver.propagate_exceptions,
) # runtime patch due no any another options to change receiver
asyncio.run(main())
@s3rius, class-based tasks are awesome — but before making a PR, I think we should discuss two problems.
-
Class-based model for all tasks What if all tasks were classes, with some backward compatibility to support the
@broker.taskdecorator? -
Labels handling If all tasks are classes, and labels are set via a
Meta(orLabels, as it’s currently called), we might want an interface likeself.meta.retry_on_error. That’s fine as long asretry_on_erroris explicitly configured.However, labels are just a dictionary — keys might not be valid Python identifiers, and any middleware can set arbitrary labels. It would be great to have a mechanism for middleware to read class
Metaand update instanceMetabefore the target method is called.
I think this could be achieved with a small update to side libraries, for example:
class Middleware:
...
def set_task_meta(self, task: Task) -> None:
task.meta.middleware_property_value = task.message.labels.get(
"middleware_property_value",
self.middleware_property_value_default
)
...
If the meta is not updated by middleware due method set_task_meta not exists, that middleware labels simply wouldn’t be available via the .meta interface. (check method exists before call set_task_meta)
I have the same problem with register_task() method.
Packages:
taskiq 0.11.20
taskiq-dependencies 1.5.7
taskiq-fastapi 0.3.5
taskiq-pipelines 0.1.4
taskiq-redis 1.1.2
Example:
@worker.task
async def disable_settings() -> None:
""" """
service: MetricSettingsTasksInterface = MetricSettingsTasksInterface()
async with service:
await service.metric_settings.taskiq_disable_ended_settings(end_date=date.today())
logger.info("Set status OLD successfully")
@worker.task
async def activate_settings() -> None:
""" """
service: MetricSettingsTasksInterface = MetricSettingsTasksInterface()
async with service:
await service.metric_settings.taskiq_activate_settings(start_date=date.today())
logger.info("Set status ACTIVE successfully")
pipe_settings: Pipeline = Pipeline(broker=worker, task=disable_settings).call_after(activate_settings)
worker.register_task(pipe_settings.kiq, task_name="settings_workflow", schedule=[{"cron": "1 0 * * *"}])
Error:
worker.register_task(pipe_settings.kiq, task_name="settings_workflow", schedule=[{"cron": "1 0 * * *"}])
~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/casper/.cache/pypoetry/virtualenvs/loser-service--bCb3ki4-py3.13/lib/python3.13/site-packages/taskiq/abc/broker.py", line 385, in register_task
return self.task(task_name=task_name, **labels)(func)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
File "/home/casper/.cache/pypoetry/virtualenvs/loser-service--bCb3ki4-py3.13/lib/python3.13/site-packages/taskiq/abc/broker.py", line 338, in inner
self.decorator_class(
~~~~~~~~~~~~~~~~~~~~^
broker=self,
^^^^^^^^^^^^
...<3 lines>...
return_type=return_type, # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
),
^
File "/home/casper/.cache/pypoetry/virtualenvs/loser-service--bCb3ki4-py3.13/lib/python3.13/site-packages/taskiq/decor.py", line 83, in __init__
self.original_func.__name__ = new_name
^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'method' object has no attribute '__name__' and no __dict__ for setting new attributes. Did you mean: '__ne__'?
But in taskiq 0.11.14 everything works fine.