taskiq icon indicating copy to clipboard operation
taskiq copied to clipboard

how to schedule while using shared_task

Open tuktamyshev opened this issue 7 months ago • 1 comments

I got an error: Broker for core.controllers.tasks:best_task_ever <taskiq.brokers.shared_broker.AsyncSharedBroker object at 0x777fc1365bb0> doesn't match scheduler's broker <taskiq_redis.redis_broker.RedisStreamBroker object at 0x777fbf9dd700>

worker.py

import sys

from src.common.config import BASE_DIR

sys.path.append(f"{BASE_DIR}/src")

from taskiq import TaskiqScheduler, async_shared_broker
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_redis import RedisStreamBroker

from common.config.main import Config
from core.controllers.tasks import best_task_ever  # noqa


def create_broker() -> RedisStreamBroker:
    config = Config()
    broker = RedisStreamBroker(
        url=f"{config.worker.BROKER_HOST}:{config.worker.BROKER_PORT}",
    )
    return broker


def create_scheduler(broker: RedisStreamBroker) -> TaskiqScheduler:
    scheduler = TaskiqScheduler(
        broker=broker,
        sources=[LabelScheduleSource(broker)],
    )
    return scheduler


broker = create_broker()
async_shared_broker.default_broker(broker)

tasks.py

scheduler = create_scheduler(broker)
@shared_task(schedule=[{"cron": "*/1 * * * *", "args": [1]}])
async def best_task_ever() -> None:
    await asyncio.sleep(5.5)
    print("All problems are solved!")

tuktamyshev avatar Apr 22 '25 17:04 tuktamyshev