taskiq
taskiq copied to clipboard
Scheduler can send task twice
Hello
In run_scheduler_loop function in next_minute calculation if datetime.now() = 17:22:59.55555 and we replace seconds and microseconds with 0 and add timedelta(minutes=1) we get next_minute=17:23:00 and when we calculate delay with delay = next_minute - datetime.now(), current time can be > or just a bit lower than next_minute, so we get negative delay like -0.000299 or positive like 0.000373
Don't you think that we should check delay like:
delay = (next_minute - datetime.now()).total_seconds()
if int(delay) <= 0:
delay = 60.0
await asyncio.sleep(delay)
Logs:
Next minute: 2024-02-22 14:13:00 Now: 2024-02-22 14:13:00.000299 Delay: -1 day, 23:59:59.999701 Delay total seconds: -0.000299 [2024-02-22 14:13:00,000][INFO ][run:delayed_send:130] Sending task billing:sync-payments.
Next minute: 2024-02-22 14:14:00 Now: 2024-02-22 14:13:00.001998 Delay: 0:00:59.998002 Delay total seconds: 59.998002 [2024-02-22 14:13:00,002][INFO ][run:delayed_send:130] Sending task billing:sync-payments.
Actually you're right, but I think it can be resolved by calculating next_minute right after sending all tasks.
diff --git a/taskiq/cli/scheduler/run.py b/taskiq/cli/scheduler/run.py
index 6a17a11..73f06f8 100644
--- a/taskiq/cli/scheduler/run.py
+++ b/taskiq/cli/scheduler/run.py
@@ -144,9 +144,6 @@ async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None:
running_schedules = set()
while True:
# We use this method to correctly sleep for one minute.
- next_minute = datetime.now().replace(second=0, microsecond=0) + timedelta(
- minutes=1,
- )
scheduled_tasks = await get_all_schedules(scheduler)
for source, task_list in scheduled_tasks.items():
for task in task_list:
@@ -165,7 +162,9 @@ async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None:
)
running_schedules.add(send_task)
send_task.add_done_callback(running_schedules.discard)
-
+ next_minute = datetime.now().replace(second=0, microsecond=0) + timedelta(
+ minutes=1,
+ )
delay = next_minute - datetime.now()
await asyncio.sleep(delay.total_seconds())
In this setup it would be impossible to get into the situation with negative number of seconds. What do you think?
Hello I have the same problem with sending scheduled task twice. Are there any solutions to resolve this problem?
What version of taskiq are you on? And can you please give some information what is happening?
taskiq==0.11.7
tasks file (main.py)
import datetime
import os
from taskiq import Context, TaskiqDepends
from taskiq.serializers import JSONSerializer
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
broker = (
ListQueueBroker("redis://localhost:6379/0")
.with_result_backend(
RedisAsyncResultBackend(
"redis://localhost:6379/1",
result_ex_time=int(datetime.timedelta(minutes=10).total_seconds()),
serializer=JSONSerializer()
)
)
)
@broker.task(schedule=[{"cron": "*/1 * * * *"}])
async def foo(context: Context = TaskiqDepends()):
print(f"\n--- {os.getpid()} ---")
print("Task ID:", context.message.task_id)
print(datetime.datetime.now())
print(f"---------------------\n")
scheduler file (schedule.py)
from taskiq.cli.scheduler.args import SchedulerArgs
from taskiq.cli.scheduler.run import get_all_schedules
from taskiq.schedule_sources import LabelScheduleSource
from taskiq import TaskiqScheduler
from task.main import broker
from task.lib import run_scheduler
scheduler = TaskiqScheduler(
broker=broker,
sources=[LabelScheduleSource(broker)],
)
async def main() -> None:
print(await get_all_schedules(scheduler))
await run_scheduler(SchedulerArgs(scheduler=scheduler, modules=[], skip_first_run=True))
if __name__ == "__main__":
import asyncio
asyncio.run(main())
taskiq runner fork file with added prints (lib.py)
import asyncio
import sys
from datetime import datetime, timedelta
from logging import basicConfig, getLevelName, getLogger
from typing import Dict, List, Optional
import pytz
from pycron import is_now
from taskiq.abc.schedule_source import ScheduleSource
from taskiq.cli.scheduler.args import SchedulerArgs
from taskiq.cli.scheduler.run import get_all_schedules, get_task_delay, delayed_send
from taskiq.cli.utils import import_object, import_tasks
from taskiq.scheduler.scheduled_task import ScheduledTask
from taskiq.scheduler.scheduler import TaskiqScheduler
logger = getLogger(__name__)
async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None:
loop = asyncio.get_event_loop()
running_schedules = set()
while True:
# We use this method to correctly sleep for one minute.
scheduled_tasks = await get_all_schedules(scheduler)
for source, task_list in scheduled_tasks.items():
for task in task_list:
try:
task_delay = get_task_delay(task)
except ValueError:
logger.warning(
"Cannot parse cron: %s for task: %s, schedule_id: %s",
task.cron,
task.task_name,
task.schedule_id,
)
continue
if task_delay is not None:
send_task = loop.create_task(
delayed_send(scheduler, source, task, task_delay),
)
running_schedules.add(send_task)
send_task.add_done_callback(running_schedules.discard)
print("\n=== Running schedules ===")
dt1 = datetime.now()
print(f"{dt1=}")
next_minute = dt1.replace(second=0, microsecond=0) + timedelta(
minutes=1,
)
print(f"{next_minute=}")
delay = next_minute - (dt2 := datetime.now())
print(f"{dt2=}")
print(f"{delay=}")
print("================\n")
await asyncio.sleep(delay.total_seconds())
async def run_scheduler(args: SchedulerArgs) -> None:
if args.configure_logging:
basicConfig(
level=getLevelName(args.log_level),
format=(
"[%(asctime)s][%(levelname)-7s]"
"[%(module)s:%(funcName)s:%(lineno)d]"
" %(message)s"
),
)
getLogger("taskiq").setLevel(level=getLevelName(args.log_level))
if isinstance(args.scheduler, str):
scheduler = import_object(args.scheduler)
else:
scheduler = args.scheduler
if not isinstance(scheduler, TaskiqScheduler):
logger.error(
"Imported scheduler is not a subclass of TaskiqScheduler.",
)
sys.exit(1)
scheduler.broker.is_scheduler_process = True
import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
for source in scheduler.sources:
await source.startup()
logger.info("Starting scheduler.")
await scheduler.startup()
logger.info("Startup completed.")
if args.skip_first_run:
next_minute = datetime.utcnow().replace(second=0, microsecond=0) + timedelta(
minutes=1,
)
delay = next_minute - datetime.utcnow()
delay_secs = int(delay.total_seconds())
logger.info(f"Skipping first run. Waiting {delay_secs} seconds.")
await asyncio.sleep(delay.total_seconds())
logger.info("First run skipped. The scheduler is now running.")
try:
await run_scheduler_loop(scheduler)
except asyncio.CancelledError:
logger.warning("Shutting down scheduler.")
await scheduler.shutdown()
for source in scheduler.sources:
await source.shutdown()
logger.info("Scheduler shut down. Good bye!")
Results