taskiq-faststream icon indicating copy to clipboard operation
taskiq-faststream copied to clipboard

schedule_by_time not working with params

Open theobouwman opened this issue 3 months ago • 3 comments

I can only schedule_by_time with task when no additional params are configured.


@broker.subscriber(
    list="scheduled.task.delayed_bunny_media_status",
)
@inject
async def task_delayed_bunny_media_status(
    # msg: DelayedContentItemTranslationMediaStatusRequest,
    content_item_service: Annotated[ContentItemService, FromDishka()]
):
    pass


# in other part of app
schedule = await scheduled_task_delayed_bunny_media_status.schedule_by_time(
            source,
            datetime.datetime.now(datetime.UTC) + datetime.timedelta(minutes=1),
            msg=DelayedContentItemTranslationMediaStatusRequest(
                organisation_id="f0bff69b-8e7b-4cc5-9bc7-dc731cd91f14",
                content_item_id="6b892a59-0be0-4b9e-96e3-876830d240af",
                content_item_translation_id="0b137e4c-f5ad-47b1-b189-ce1cefb6c6c8"
            )
        )

So the scheduled task only works when task_delayed_bunny_media_status the msg is commented out and msg is not passed to the schedule_by_time.

So not possible to pass additional params.

But, I see that the task is being stored in Redis and eventually removed...

I am using FastSteam with Taskiq.

This is my configure code:

container, broker = make_container()

app = AsgiFastStream(
    broker,
    asgi_routes=[
        ("/health", make_ping_asgi(broker, timeout=10.0)),
    ]
)

taskiq_broker = AppWrapper(app)
source = RedisScheduleSource(f"{get_config().REDIS_SERVER_URL}/2")
scheduler = StreamScheduler(
    broker=taskiq_broker,
    sources=[source],
)

#
# FastStream setup
#
setup_dishka(
    container,
    app,
    finalize_container=False,
)

init_observability("momo-worker")
firebase_initialization()


#
# Cron tasks
#
taskiq_broker.task(list="scheduled.task.every_minute_fetch_discover_fill_cache", schedule=[{"cron": "* * * * *"}])
taskiq_broker.task(list="scheduled.task.delete_expired_user_locations", schedule=[{"cron": "*/20 * * * *"}])
taskiq_broker.task(list="scheduled.task.morning_interactions_summary_notifications", schedule=[{"cron": "0 9 * * *"}])
scheduled_task_organisation_weekly_admin_events_reminder = taskiq_broker.task(list="scheduled.task.organisation_weekly_admin_events_reminder", schedule=[{"cron": "0 10 * * MON"}])
taskiq_broker.task(list="scheduled.task.organisation_weekly_events_reminder", schedule=[{"cron": "0 7 * * MON"}])
taskiq_broker.task(list="scheduled.task.organisation_daily_events_reminder", schedule=[{"cron": "0 7 * * *"}])
taskiq_broker.task(list="scheduled.task.event_starting_reminder_few_hours", schedule=[{"cron": "*/5 * * * *"}])
taskiq_broker.task(list="scheduled.task.event_starting_reminder_three_days", schedule=[{"cron": "*/5 * * * *"}])
taskiq_broker.task(list="scheduled.task.event_starting_reminder_seven_days", schedule=[{"cron": "*/5 * * * *"}])
taskiq_broker.task(list="scheduled.task.bi_monthly_organisation_email_summary", schedule=[{"cron": "10 10 5 * *"}])
taskiq_broker.task(list="scheduled.task.bi_monthly_discover_email_update_first", schedule=[{"cron": "0 8 10 * *"}])
taskiq_broker.task(list="scheduled.task.bi_monthly_discover_email_update_second", schedule=[{"cron": "0 8 25 * *"}])
taskiq_broker.task(list="scheduled.task.hourly_sync_all_content_serie_podcasts", schedule=[{"cron": "0 * * * *"}])
taskiq_broker.task(list="scheduled.task.engagement_user_content_items_push_notification", schedule=[{"cron": "*/8 * * * *"}])
scheduled_task_delayed_bunny_media_status = taskiq_broker.task(list="scheduled.task.delayed_bunny_media_status", schedule=[])

theobouwman avatar Aug 30 '25 15:08 theobouwman

Seems like this issue is more related to taskiq-faststream integration.

s3rius avatar Aug 30 '25 19:08 s3rius

Does it work on vanila taskiq?

s3rius avatar Aug 30 '25 19:08 s3rius

@s3rius i did not test is with vanilla taskiq. As we use the faststream integration I also think this is a problem more related to the taskiq-faststream integration

theobouwman avatar Sep 01 '25 12:09 theobouwman