taskiq
taskiq copied to clipboard
kiq jobs with unique names, so they won't be created twice
I can't seem to find anything about this in the docs. I'd like to assign a job (not a task) a unique name, so that if I were to try to kiq a job with the same parameters twice, the broker would ignore the duplicate job. Something like:
@broker.task
async def foo(param):
print(param)
await asyncio.sleep(1)
if __name__ == __main__:
params = ['a', 'a', 'b', 'c'] # the second "a" should be skipped
for p in params:
await foo.kiq(p, job_id=f"job_{p}")
>>> "a"
>>> "b"
>>> "c"
Is this possible?
Clearly the AsyncBroker can take an alternative task_id_generator, but it doesn't seem possible to pass a custom task_id anywhere, nor is there uniqueness handling, I think?
Hi, @gegnew and thanks for your interest in the project. You're right. We don't handle uniqueness of ids. Ids meant to be unique only during task execution to not save calculation result twice.
But here's what you can do. You can create a middleware that tracks ids so they are unique among tasks with specific name.
from taskiq.abc.broker import AsyncBroker
from taskiq.message import TaskiqMessage
from taskiq_redis import ListQueueBroker
from taskiq import TaskiqMiddleware
from redis.asyncio import Redis
class UniqueIdMiddleware(TaskiqMiddleware):
def __init__(self, redis_url: str) -> None:
self.pool = Redis.from_url(redis_url)
def set_broker(self, broker: AsyncBroker) -> None:
return super().set_broker(broker)
async def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
if await self.pool.get(f"unique:{message.task_name}:{message.task_id}"):
raise ValueError("Task is already running")
await self.pool.set(f"unique:{message.task_name}:{message.task_id}", 1, ex=20)
return message
broker = ListQueueBroker("redis://localhost:6379").with_middlewares(
UniqueIdMiddleware("redis://localhost:6379")
)
@broker.task
async def my_task():
print("I'm a task")
async def main():
await my_task.kicker().with_task_id("2").kiq()
await my_task.kicker().with_task_id("2").kiq()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
I won't implement this functionality on broker's behalf, because it doesn't meant to be like this. If you need it, you can easily implement it on your own.
Hej,
When I tried this method, every time the ValueError gets thrown, it crashes the task runner, and it doesn't acnkowledge the message in the queue. So when the task worker restarts, I get the same task message again.
What would be the proper way to drop the message from the queue?
Okay, I will take a look at it a bit later. Will try to come up with the fix or a new way of calling middlewares.
Hej s3rius!
Any news on how to skip tasks from running from the middleware? I'm thinking changing/adding methods to the middleware tha tallows messages to be skipped, or raising a specific Skip Exception that get's handled differently where the middleware is called?