taskiq
taskiq copied to clipboard
Add intervals for scheduled task
Hi!
Following up on PR #399, which I left unfinished half a year ago:
In this PR, I want to add intervals for scheduled tasks.
List of changes:
- Added a new
intervalfield toScheduledTask. - Implemented the
SchedulerLoopclass—an abstraction over the scheduler loop. - Added a new
loop_intervalargument toSchedulerArgs. - Moved the
skip_first_runmechanism intoSchedulerLoop.run(). - The
teskiq.api.scheduler.run_scheduler_tasksfunction now usesSchedulerLoop. - Implemented the
schedule_by_intervalmethod in theAsyncTaskiqDecoratedTaskandAsyncKickerclasses. - Wrote tests for the new logic.
- Updated the documentation.
The main idea is to iterate through all tasks every loop_interval and check if they need to be executed.
Example:
@broker.task(schedule=[{"interval": 5, "args": [1]}])
async def add_one(value: int) -> int:
await asyncio.sleep(0.5)
return value + 1
Logs:
[2025-08-18 11:07:50,547][INFO ][run:run_scheduler:396] Starting scheduler.
[2025-08-18 11:07:50,551][INFO ][run:run_scheduler:398] Startup completed.
[2025-08-18 11:07:51,001][INFO ][run:send:160] Sending task test:add_one with schedule_id 5b32c74a07d447d68f506b089616d526.
[2025-08-18 11:07:56,002][INFO ][run:send:160] Sending task test:add_one with schedule_id 5b32c74a07d447d68f506b089616d526.
[2025-08-18 11:08:01,002][INFO ][run:send:160] Sending task test:add_one with schedule_id 5b32c74a07d447d68f506b089616d526.
[2025-08-18 11:08:06,002][INFO ][run:send:160] Sending task test:add_one with schedule_id 5b32c74a07d447d68f506b089616d526.
[2025-08-18 11:08:11,001][INFO ][run:send:160] Sending task test:add_one with schedule_id 5b32c74a07d447d68f506b089616d526.
[2025-08-18 11:08:16,001][INFO ][run:send:160] Sending task test:add_one with schedule_id 5b32c74a07d447d68f506b089616d526.
[2025-08-18 11:08:21,001][INFO ][run:send:160] Sending task test:add_one with schedule_id 5b32c74a07d447d68f506b089616d526.
I saw that several tests failed on older versions of Pydantic and Python, so I pushed fixes.
I have resolved the merge conflicts and fixed the failing tests.