taskiq
taskiq copied to clipboard
how to schedule while using shared_task
I got an error: Broker for core.controllers.tasks:best_task_ever <taskiq.brokers.shared_broker.AsyncSharedBroker object at 0x777fc1365bb0> doesn't match scheduler's broker <taskiq_redis.redis_broker.RedisStreamBroker object at 0x777fbf9dd700>
worker.py
import sys
from src.common.config import BASE_DIR
sys.path.append(f"{BASE_DIR}/src")
from taskiq import TaskiqScheduler, async_shared_broker
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_redis import RedisStreamBroker
from common.config.main import Config
from core.controllers.tasks import best_task_ever # noqa
def create_broker() -> RedisStreamBroker:
config = Config()
broker = RedisStreamBroker(
url=f"{config.worker.BROKER_HOST}:{config.worker.BROKER_PORT}",
)
return broker
def create_scheduler(broker: RedisStreamBroker) -> TaskiqScheduler:
scheduler = TaskiqScheduler(
broker=broker,
sources=[LabelScheduleSource(broker)],
)
return scheduler
broker = create_broker()
async_shared_broker.default_broker(broker)
tasks.py
scheduler = create_scheduler(broker)
@shared_task(schedule=[{"cron": "*/1 * * * *", "args": [1]}])
async def best_task_ever() -> None:
await asyncio.sleep(5.5)
print("All problems are solved!")