taskiq icon indicating copy to clipboard operation
taskiq copied to clipboard

Doesn't match scheduler's broker

Open vahidzhe opened this issue 1 month ago • 0 comments

Hello, I have this configuration. My scheduler tasks are only available on the broker. Other brokers only have my regular tasks. Why am I getting the following warning?

[2025-10-25 10:35:40,889][WARNING][label_based:startup:40] Broker for taskiq_pipelines.shared.filter_tasks <taskiq.brokers.shared_broker.AsyncSharedBroker object at 0x7f3aec248210> doesn't match scheduler's broker <taskiq_redis.redis_broker.ListQueueBroker object at 0x7f3ae807de50> 2025-10-25 14:35:40

broker = (
    ListQueueBroker(url=settings.REDIS_URL, queue_name='general_queue')
    .with_result_backend(
        RedisAsyncResultBackend(
            redis_url=settings.REDIS_URL,
            keep_results=True,
            result_ex_time=3600,
        )
    )
    .with_middlewares(
        TaskiqAdminMiddleware(
            url=settings.TASKIQ_ADMIN_URL,
            api_token=settings.TASKIQ_ADMIN_API_TOKEN,
            taskiq_broker_name='Fromfolio broker',
        ),
        SmartRetryMiddleware(
            default_retry_count=5,
            default_delay=10,
            use_jitter=True,
            use_delay_exponent=True,
            max_delay_exponent=120,
        ),
    )
)


OTHER_REDIS_URL = settings.REDIS_URL.rsplit('/', 1)[0] + '/1'
other_broker = (
    ListQueueBroker(
        url=OTHER_REDIS_URL,
        queue_name='other_queue',
        max_connection_pool_size=50,
    )
    .with_result_backend(
        RedisAsyncResultBackend(
            redis_url=OTHER_REDIS_URL,
            result_ex_time=3600,
        )
    )
    .with_middlewares(
        TaskiqAdminMiddleware(
            url=settings.TASKIQ_ADMIN_URL,
            api_token=settings.TASKIQ_ADMIN_API_TOKEN,
            taskiq_broker_name='other broker',
        ),
        SmartRetryMiddleware(
            default_retry_count=5,
            default_delay=10,
            use_jitter=True,
            use_delay_exponent=True,
            max_delay_exponent=120,
        ),
        PipelineMiddleware(),
    )
)


redis_source = RedisScheduleSource(settings.REDIS_URL)

scheduler = TaskiqScheduler(
    broker,
    [
        redis_source,
        LabelScheduleSource(broker),
        LabelScheduleSource(other_broker),
    ],
)


task = Pipeline(other_broker, check_user).call_after(
            calc_user_rating,
            apply_id=apply.id,
)

vahidzhe avatar Oct 26 '25 17:10 vahidzhe