taskiq
taskiq copied to clipboard
Periodic tasks on top of the per-second main loop runner
Updates
- Per-second main event loop cycle;
- ScheduledTask.period field is added;
- Periodic tasks execution feature has been added.
Design ways
- The current approach with iterating over scheduled tasks each second.
- The 2-stages approach: a. Iterate over each minute as in the AS-IS solution. b. Add a second 60-seconds over-each-second iteration cycle to iterates through to be launched each minute. The second approach looks like:
while True:
...
next_minute = datetime.now().replace(second=0, microsecond=0) + timedelta(minutes=1)
while datetime.now().replace(second=0, microsecond=0) != next_minute:
...
next_second = datetime.now().replace(microsecond=0) + timedelta(seconds=1)
await asyncio.sleep((next_second - datetime.now()).total_seconds())
Notes:
- The Documentation has to be updated after the PR is merged.
The example to run the feature against is main_test.py in the root directory:
# # broker.py
import asyncio
from taskiq.brokers.inmemory_broker import InMemoryBroker
from taskiq.schedule_sources import LabelScheduleSource
from taskiq import TaskiqScheduler
broker = InMemoryBroker()
scheduler = TaskiqScheduler(
broker=broker,
sources=[LabelScheduleSource(broker)],
)
@broker.task(schedule=[{"cron": "* * * * *", "args": [1]}])
async def each_minute_cron(value: int) -> int:
print(f"The {each_minute_cron.__qualname__} task has been launched")
await asyncio.sleep(0.5)
return value + 1
@broker.task(schedule=[{"period": 2, "args": [1]}])
async def each_2_seconds_task(value: int) -> int:
print(f"The {each_2_seconds_task.__qualname__} task has been launched")
await asyncio.sleep(0.5)
return value + 1