taskiq icon indicating copy to clipboard operation
taskiq copied to clipboard

Periodic tasks on top of the per-second main loop runner

Open gencurrent opened this issue 1 year ago • 0 comments

Updates

  1. Per-second main event loop cycle;
  2. ScheduledTask.period field is added;
  3. Periodic tasks execution feature has been added.

Design ways

  1. The current approach with iterating over scheduled tasks each second.
  2. The 2-stages approach: a. Iterate over each minute as in the AS-IS solution. b. Add a second 60-seconds over-each-second iteration cycle to iterates through to be launched each minute. The second approach looks like:
while True:
  ...
  next_minute = datetime.now().replace(second=0, microsecond=0) + timedelta(minutes=1)
  while datetime.now().replace(second=0, microsecond=0) != next_minute:
    ...
    next_second = datetime.now().replace(microsecond=0) + timedelta(seconds=1)
    await asyncio.sleep((next_second - datetime.now()).total_seconds())

Notes:

  • The Documentation has to be updated after the PR is merged.

The example to run the feature against is main_test.py in the root directory:

# # broker.py
import asyncio
from taskiq.brokers.inmemory_broker import InMemoryBroker

from taskiq.schedule_sources import LabelScheduleSource
from taskiq import TaskiqScheduler

broker = InMemoryBroker()

scheduler = TaskiqScheduler(
    broker=broker,
    sources=[LabelScheduleSource(broker)],
)


@broker.task(schedule=[{"cron": "* * * * *", "args": [1]}])
async def each_minute_cron(value: int) -> int:
    print(f"The {each_minute_cron.__qualname__} task has been launched")
    await asyncio.sleep(0.5)
    return value + 1


@broker.task(schedule=[{"period": 2, "args": [1]}])
async def each_2_seconds_task(value: int) -> int:
    print(f"The {each_2_seconds_task.__qualname__} task has been launched")
    await asyncio.sleep(0.5)
    return value + 1

gencurrent avatar May 01 '24 15:05 gencurrent