apscheduler icon indicating copy to clipboard operation
apscheduler copied to clipboard

APScheduler 4.0.0 Bug

Open doluk opened this issue 1 year ago • 45 comments

Things to check first

  • [X] I have checked that my issue does not already have a solution in the FAQ

  • [X] I have searched the existing issues and didn't find my bug already reported there

  • [X] I have checked that my bug is still present in the latest release

Version

4.0.0a2.post72

What happened?

I know it is an alpha etc. But I wanted to report it, because I didn't observe it with lower number of jobs: I use the apscheduler with a postgresql jobstore and have around 5000 schedules, but the same task, just different arguments. They are scheduled with a DateTrigger and schedule themself at the end of the task for the next execution.

After some runtime some of the schedules are simply gone, the task itself had no errors and ensures in theory always a rescheduling. Meaning the number of schedules drops constantly. I observed this behavior with the thread executor but also the normal async executor. With the thread executor often the complete scheduler would crash occasionally, With the async one at least the scheduler stays alive. But both times the jobs table has an enourmous number of rows. Right now I have around 100k rows in there (I can provide a dump if needed). Also the logs shows the following error multiple times (same error just another scope valueand UUID):

  + Exception Group Traceback (most recent call last):
  |   File "/root/ccn_tracker/scheduler_main.py", line 90, in main
  |     await scheduler.run_until_stopped()
  |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/schedulers/async_.py", line 514, in run_until_stopped
  |     async with create_task_group() as task_group:
  |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 664, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/anyio/streams/memory.py", line 217, in send
    |     self.send_nowait(item)
    |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/anyio/streams/memory.py", line 200, in send_nowait
    |     raise WouldBlock
    | anyio.WouldBlock
    | 
    | During handling of the above exception, another exception occurred:
    | 
    | Traceback (most recent call last):
    |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/schedulers/async_.py", line 700, in _process_jobs
    |     jobs = await self.data_store.acquire_jobs(self.identity, limit)
    |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/datastores/sqlalchemy.py", line 759, in acquire_jobs
    |     await self._event_broker.publish(
    |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/eventbrokers/asyncpg.py", line 179, in publish
    |     await self._send.send(notification)
    |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/anyio/streams/memory.py", line 223, in send
    |     await send_event.wait()
    |   File "/usr/local/lib/python3.10/asyncio/locks.py", line 214, in wait
    |     await fut
    |   File "/usr/local/lib/python3.10/asyncio/futures.py", line 285, in __await__
    |     yield self  # This tells Task to wait for completion.
    |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 304, in __wakeup
    |     future.result()
    |   File "/usr/local/lib/python3.10/asyncio/futures.py", line 196, in result
    |     raise exc
    | asyncio.exceptions.CancelledError: Cancelled by cancel scope 7f6a84b10610
    | 
    | During handling of the above exception, another exception occurred:
    | 
    | Exception Group Traceback (most recent call last):
    |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 232, in __step
    |     result = coro.send(None)
    |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/schedulers/async_.py", line 683, in _process_jobs
    |     async with AsyncExitStack() as exit_stack:
    |   File "/usr/local/lib/python3.10/contextlib.py", line 714, in __aexit__
    |     raise exc_details[1]
    |   File "/usr/local/lib/python3.10/contextlib.py", line 697, in __aexit__
    |     cb_suppress = await cb(*exc_details)
    |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 664, in __aexit__
    |     raise BaseExceptionGroup(
    | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (82 sub-exceptions)
    +-+---------------- 1 ----------------
      | Traceback (most recent call last):
      |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 232, in __step
      |     result = coro.send(None)
      |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/schedulers/async_.py", line 785, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('7baf47d5-46f3-4c56-b4ca-9fcdc4189250')
      +---------------- 2 ----------------
      | Traceback (most recent call last):
      |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 232, in __step
      |     result = coro.send(None)
      |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/schedulers/async_.py", line 785, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('7baf47d5-46f3-4c56-b4ca-9fcdc4189250')
      +---------------- 3 ----------------
      | Traceback (most recent call last):
      |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 232, in __step
      |     result = coro.send(None)
      |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/schedulers/async_.py", line 785, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('7baf47d5-46f3-4c56-b4ca-9fcdc4189250')
      +---------------- 4 ----------------
      | Traceback (most recent call last):
      |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 232, in __step
      |     result = coro.send(None)
      |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/schedulers/async_.py", line 785, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('7baf47d5-46f3-4c56-b4ca-9fcdc4189250')
      +---------------- 5 ----------------
      | Traceback (most recent call last):
      |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 232, in __step
      |     result = coro.send(None)
      |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/schedulers/async_.py", line 785, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('7baf47d5-46f3-4c56-b4ca-9fcdc4189250')
      +---------------- 6 ----------------
      | Traceback (most recent call last):
      |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 232, in __step
      |     result = coro.send(None)
      |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/schedulers/async_.py", line 785, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('7baf47d5-46f3-4c56-b4ca-9fcdc4189250')
      +---------------- 7 ----------------
      | Traceback (most recent call last):
      |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 232, in __step
      |     result = coro.send(None)
      |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/schedulers/async_.py", line 785, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('7baf47d5-46f3-4c56-b4ca-9fcdc4189250')
      +---------------- 8 ----------------
      | Traceback (most recent call last):
      |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 232, in __step
      |     result = coro.send(None)
      |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/schedulers/async_.py", line 785, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('7baf47d5-46f3-4c56-b4ca-9fcdc4189250')
      +---------------- 9 ----------------
      | Traceback (most recent call last):
      |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 232, in __step
      |     result = coro.send(None)
      |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/schedulers/async_.py", line 785, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('7baf47d5-46f3-4c56-b4ca-9fcdc4189250')
      +---------------- 10 ----------------
      | Traceback (most recent call last):
      |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 232, in __step
      |     result = coro.send(None)
      |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/schedulers/async_.py", line 785, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('7baf47d5-46f3-4c56-b4ca-9fcdc4189250')
      +---------------- 11 ----------------
      | Traceback (most recent call last):
      |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 232, in __step
      |     result = coro.send(None)
      |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/schedulers/async_.py", line 785, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('7baf47d5-46f3-4c56-b4ca-9fcdc4189250')
      +---------------- 12 ----------------
      | Traceback (most recent call last):
      |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 232, in __step
      |     result = coro.send(None)
      |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/schedulers/async_.py", line 785, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('7baf47d5-46f3-4c56-b4ca-9fcdc4189250')
      +---------------- 13 ----------------
      | Traceback (most recent call last):
      |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 232, in __step
      |     result = coro.send(None)
      |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/schedulers/async_.py", line 785, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('7baf47d5-46f3-4c56-b4ca-9fcdc4189250')
      +---------------- 14 ----------------
      | Traceback (most recent call last):
      |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 232, in __step
      |     result = coro.send(None)
      |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/schedulers/async_.py", line 785, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('7baf47d5-46f3-4c56-b4ca-9fcdc4189250')
      +---------------- 15 ----------------
      | Traceback (most recent call last):
      |   File "/usr/local/lib/python3.10/asyncio/tasks.py", line 232, in __step
      |     result = coro.send(None)
      |   File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/schedulers/async_.py", line 785, in _run_job
      |     self._running_jobs.remove(job.id)
      | KeyError: UUID('7baf47d5-46f3-4c56-b4ca-9fcdc4189250')
      +---------------- ... ----------------
      | and 67 more exceptions
      +------------------------------------

How can we reproduce the bug?

from __future__ import annotations

import asyncio
import asyncpg
import sys
import logging
import traceback
from datetime import datetime, timedelta, timezone
from logging import handlers
from typing import Optional


if sys.version_info >= (3, 11):
	from typing import Self
else:
	from typing_extensions import Self

import apscheduler
from apscheduler._schedulers.async_ import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.serializers.json import JSONSerializer
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy.ext.asyncio import create_async_engine


# logging
logger = logging.getLogger('bug_demo')
file_handler = handlers.TimedRotatingFileHandler(filename=f"bug_demo.log",
                                                 when='MIDNIGHT', encoding='utf-8',
                                                 backupCount=0, utc=False)
file_handler.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s-%(name)s:\n'
                                            '%(module)s - %(funcName)s - %(lineno)d:\n'
                                            '%(message)s\n\n'))
file_handler.namer = lambda name: name.replace(".log.", "_") + ".log"
logger.addHandler(file_handler)
logger.setLevel(logging.ERROR)

# dedicated logger for apscheduler
logger2 = logging.getLogger("apscheduler")
file_handler2 = handlers.TimedRotatingFileHandler(filename=f"bug_demo_appscheduler.log",
                                                 when='MIDNIGHT', encoding='utf-8',
                                                 backupCount=0, utc=False)
file_handler2.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s-%(name)s:\n'
                                            '%(module)s - %(funcName)s - %(lineno)d:\n'
                                            '%(message)s\n\n'))
file_handler2.namer = lambda name: name.replace(".log.", "_") + ".log"
logger2.addHandler(file_handler2)
logger2.setLevel(logging.INFO)


cfg = {'user'    : 'ENTER USER HERE',
       'password': 'ENTER PASSWORD HERE',
       'host'    : 'ENTER HOST HERE',
       'database': 'ENTER DATABASE HERE',
       'port': 'ENTER PORT HERE'
       }


class ExtendedAsyncScheduler(AsyncScheduler):
	async def __aenter__(self: Self) -> Self:
		await super().__aenter__()
		self.db = await asyncpg.create_pool(**cfg)
		self.logger = logger
		return self


async def regular_check(check_id: str, scheduler: Optional[ExtendedAsyncScheduler] = None):
	if not scheduler:
		try:
			scheduler: ExtendedAsyncScheduler = apscheduler.current_async_scheduler.get()
		except LookupError as e:
			raise e
	try:
		run_date = datetime.now(tz=timezone.utc) + timedelta(minutes=5)
		await scheduler.add_schedule(regular_check,
		                             DateTrigger(run_time=run_date),
		                             id=f"regular_check-{check_id}",
		                             misfire_grace_time=300,
		                             conflict_policy=apscheduler.ConflictPolicy.replace,
		                             args=(check_id,),
		                             max_jitter=30)
		logger.info(f"Added regular check {check_id} at {run_date}")
	except Exception as e:
		logger.error(f"Error in regular check {check_id}\n{traceback.format_exc()}")


async def check_self(scheduler: Optional[ExtendedAsyncScheduler] = None):
	if not scheduler:
		try:
			scheduler: ExtendedAsyncScheduler = apscheduler.current_async_scheduler.get()
		except LookupError as e:
			raise e
	db: asyncpg.Pool = scheduler.db
	try:
		async with db.acquire() as conn:
			jobs = await conn.fetchrow("SELECT count(*) as total, count(*) filter ( where jobs.task_id is not null and "
			                         "task_id ilike '__main__:regular_check' ) as reg "
			                         "from apscheduler.jobs")
			if not jobs:
				jobs = (0, 0)
			schedules = await conn.fetchrow("SELECT count(*) as total, count(*) filter ( where "
			                         "id ilike 'regular_check%' ) as req from apscheduler.schedules")
			if not schedules:
				schedules = (0, 0)
		now = datetime.utcnow()
		scheduler.logger.error(f"{now}\njobs: {jobs[0]}, schedules: {schedules[0]}, "
		                       f"regular_check schedules: {schedules[1]}, regular_check jobs: {jobs[1]}\n"
		                       f"regular_check jobs+schedules: {schedules[1] + jobs[1]}")
	except Exception as e:
		logger.error(f"Error in check_self\n{traceback.format_exc()}")

async def create_schedules(scheduler: Optional[ExtendedAsyncScheduler] = None):
	if not scheduler:
		try:
			scheduler: ExtendedAsyncScheduler = apscheduler.current_async_scheduler.get()
		except LookupError as e:
			raise e
	db: asyncpg.Pool = scheduler.db
	try:
		async with db.acquire() as conn:
			for i in range(2000):
				try:
					schedule = await conn.fetchrow("SELECT * from apscheduler.schedules where id ilike $1",
					                              f"regular_check-{i}")
				except Exception as e:
					traceback.print_exc()
					raise e
				if not schedule:
					await scheduler.add_job(regular_check, args=(f'{i}',))
		logger.error("Finished creating schedules")
	except Exception as e:
		logger.error(f"Error in create_schedules\n{traceback.format_exc()}")

async def main():
	engine = create_async_engine(
			f"postgresql+asyncpg://{cfg['user']}:{cfg['password']}@{cfg['host']}/{cfg['database']}")
	serializer = JSONSerializer()
	data_store = SQLAlchemyDataStore(engine, serializer=serializer, schema='apscheduler')
	async with ExtendedAsyncScheduler(data_store=data_store,
	                                  role=apscheduler.SchedulerRole.both) as scheduler:
		await check_self(scheduler)
		await scheduler.add_schedule(check_self,
		                             IntervalTrigger(minutes=1),
		                             id="check_self",
		                             misfire_grace_time=120)
		# await scheduler.add_job(create_schedules) # creates bug directly before scheduler even starts
		await create_schedules(scheduler) # works fine for the first few minutes and then the bug occurs again
		await check_self(scheduler)
		print('starting scheduler')
		await scheduler.run_until_stopped()


if __name__ == '__main__':
	asyncio.run(main())

doluk avatar Oct 18 '23 11:10 doluk

After some runtime some of the schedules are simply gone, the task itself had no errors and ensures in theory always a rescheduling. Meaning the number of schedules drops constantly.

Schedules are supposed to be deleted when they've run their course. Is this what you're seeing? Without a reproducing example I can't say much more.

agronholm avatar Oct 22 '23 10:10 agronholm

Yes sure, in my case they should add a new schedule with the same name at the end of the task. I referred to the non existence of the new schedules

doluk avatar Oct 22 '23 14:10 doluk

I was able to reproduce this bug also with 4.0.0a3. Posting my example now. Just a quick comment: if you use await scheduler.add_job(create_schedules) you get directly the error with the traceback above. Using await create_schedules() instead raises no exception and you dont see anything in the apscheduler logs, but the other logs (bug_demo.log) show that the number of schedules is decreasing over time.

doluk avatar Oct 26 '23 10:10 doluk

Could you try with v4.0.0a4?

agronholm avatar Nov 13 '23 00:11 agronholm

I upgraded apscheduler, deleted the old tables in the database and tried to use the same code as before, but getting now this error:


2023-11-13 08:17:24,598:ERROR-bug_demo:hon3.10/site-packages/apscheduler/_schedulers/async_.py", line 737, in _process_schedules
async_ - _process_schedules - 739:
Error computing next fire time for schedule 'check_self' of task '__main__:check_self' – removing schedulen next
    self._last_fire_time += self._interval
Traceback (most recent call last):r (not "datetime.timedelta") to str
20File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/_schedulers/async_.py", line 737, in _process_schedules
bug_fire_time = calculate_next()
20File "/root/.venv/ccn_tracker/lib/python3.10/site-packages/apscheduler/triggers/interval.py", line 67, in next
jobsself._last_fire_time += self._intervalules: 0, regular_check jobs: 0
TypeError: can only concatenate str (not "datetime.timedelta") to str

sorry my phone messes with the formating in the terminal, i can get you a properly formatted one later

doluk avatar Nov 13 '23 07:11 doluk

That looks like there are strings leaking to where only datetimes should go. Can you provide a simpler way to reproduce the problem that you're seeing?

agronholm avatar Nov 13 '23 13:11 agronholm

The error is from the Interval trigger, just create a schedule with an interval trigger. It executes ones and then creates this error

doluk avatar Nov 13 '23 14:11 doluk

It works with the memory store, but with the sqlalchemy store I'm getting a bizarre (but different) error about the function reference which I'm investigating.

agronholm avatar Nov 13 '23 14:11 agronholm

That was a false alarm, it was caused by a leftover schedule from another run. When I cleared the database, it ran perfectly. Is the PostgreSQL standalone example not working for you?

agronholm avatar Nov 13 '23 15:11 agronholm

I created a whole new database and run the standalone example, which works fine for me! I then cleared the whole database and used my code again. Still getting the same error. So I started slowly changing the code of the postgresql standalone example towards my code. And with the addition of the JSON serializer it breaks and starts throwing this error TypeError: can only concatenate str (not "datetime.timedelta") to str. I looked a bit into the code and found that if I change interval.py:45 to _last_fire_time: datetime | None = attrs.field(init=False, eq=False, default=None, converter=as_aware_datetime) it works without the previously mentioned TypeError. I tested with this change the Cbor and Pickle serializer as well, which work fine for both.

doluk avatar Nov 14 '23 07:11 doluk

Ok, I missed your use of JSONSerializer there. I've actually just started a discussion on this, and would like to know your reasons for a serializer other than pickle.

agronholm avatar Nov 14 '23 08:11 agronholm

Replied there as well. For me it was more debugging, to understand what arguments are going in. I checked my initial report by the way with the pickle serializer and the problem still persists.

doluk avatar Nov 14 '23 09:11 doluk

And I think I found the problem causing this bug of the disappearing schedules. My example code to reproduce it creates 2000 times the same schedules and numerating them. It first works find but then stops working after around 20 minutes. Every schedule schedule another schedule with the same id regular_check-{number} during the execution. I enabled the logging of apscheduler and counted the invokes and the job completed. Those were always 2000 for every round of schedules. To further understand what is happening I modifed the code of the schedule regular_check. I try to get the old_schedule, which is currently excuted and the new one after it got scheduled. To distinguish them, I added a counter to the id, counting the executions. Now there are not longer any schedules disappearing (meaning the count of schedules is 2000 regular_check plus 1 the self_check). Curious by that I looked into the logs. It appears that sometimes during the execution the schedule is still present and receivable with scheduler.get_schedule() and sometimes not. I suspect that the newly scheduled schedule is somehow getting deleted at the end of the job execution, because they have the same id.

async def regular_check(check_id: str, scheduler: Optional[ExtendedAsyncScheduler] = None):
	if not scheduler:
		try:
			scheduler: ExtendedAsyncScheduler = apscheduler.current_async_scheduler.get()
		except LookupError as e:
			raise e
	check_id = check_id.replace("regular_check-", "")
	[check_no, iteration] = check_id.split("+")
	iteration = int(iteration)
	try:
		old_schedule = await scheduler.get_schedule(f"regular_check-{check_id}")
	except ScheduleLookupError:
		old_schedule = None
	try:
		run_date = datetime.now(tz=timezone.utc) + timedelta(minutes=5)
		await scheduler.add_schedule(regular_check,
		                             DateTrigger(run_time=run_date),
		                             id=f"regular_check-{check_no}+{iteration + 1}",
		                             misfire_grace_time=300,
		                             conflict_policy=apscheduler.ConflictPolicy.exception,
		                             args=(f"{check_no}+{iteration + 1}",),
		                             max_jitter=30)
		try:
			new_schedule = await scheduler.get_schedule(f"regular_check-{check_no}+{iteration + 1}")
		except ScheduleLookupError:
			new_schedule = None
		scheduler.logger.info(f"Old schedule: {old_schedule=}\nNew schedule: {new_schedule=}" +
		                      f"\nAdded regular check {check_id} {iteration+1} at {run_date}")
	except Exception as e:
		scheduler.logger.error(f"Error in regular check {check_id}\n{traceback.format_exc()}")

async def create_schedules(scheduler: Optional[ExtendedAsyncScheduler] = None):
	if not scheduler:
		try:
			scheduler: ExtendedAsyncScheduler = apscheduler.current_async_scheduler.get()
		except LookupError as e:
			raise e
	db: asyncpg.Pool = scheduler.db
	try:
		async with db.acquire() as conn:
			for i in range(2000):
				x = str(i)
				while len(x) < 4:
					x = "0" + x
				try:
					schedule = await conn.fetchrow("SELECT * from jsonserializer.schedules where id ilike $1",
					                               f"regular_check-{x}%")
				except Exception as e:
					traceback.print_exc()
					raise e
				if not schedule:
					await scheduler.add_job(regular_check, args=(f'{x}+0',))
		scheduler.logger.error("Finished creating schedules")
	except Exception as e:
		scheduler.logger.error(f"Error in create_schedules\n{traceback.format_exc()}")

doluk avatar Nov 14 '23 10:11 doluk

Schedules are always deleted when their triggers don't produce any more fire times, that is normal. Is that what you mean?

agronholm avatar Nov 14 '23 12:11 agronholm

Schedule 1 with a datetrigger is executed, during the execution a new schedule 2 with the same id is created. Sometimes during the execution of the job of schedule 1, the schedule 1 is still accessible by the id, sometimes not. If it is present, somehow the schedule 2 disappears after the job of schedule 1 finished.

By using real unique ids this can be avoided. Sadly the part with the KeyError still happens

doluk avatar Nov 14 '23 12:11 doluk

Sometimes during the execution of the job of schedule 1, the schedule 1 is still accessible by the id, sometimes not

Is your expectation that the schedule is available as long as there are any jobs active associated with it?

agronholm avatar Nov 14 '23 12:11 agronholm

As long as it is consistent I am okay with either one. I would prefer it not being available unless it produces another runtime. Because the schedule exhausted and therefore the schedule id should be free to use again.

doluk avatar Nov 14 '23 13:11 doluk

That sounds reasonable. The reason why it works the way it does now is that _process_schedules() creates the jobs, and only after creating the necessary jobs does it remove any finished schedules.

agronholm avatar Nov 14 '23 13:11 agronholm

This out of the way, the initial reported error still occurs 🙈 During that I also stumbled across another thing: How does apscheduler handle many pending jobs in combination with jobs from schedules? When I add a lot of jobs before the scheduler is started and have a schedule with an interval trigger, I noticed that after starting up the interval schedule/jobs are not executed in that interval. My expectation would be here, that jobs from schedules have priority compared to jobs which are not connected to a schedule

doluk avatar Nov 15 '23 08:11 doluk

This out of the way, the initial reported error still occurs

Can you clarify what you mean?

My expectation would be here, that jobs from schedules have priority compared to jobs which are not connected to a schedule

The schedulers fetch jobs in the FIFO order, by created_at. There is no other prioritization occurring.

agronholm avatar Nov 15 '23 09:11 agronholm

the initial error was that a KeyError removing job.id from self._running_jobs

doluk avatar Nov 29 '23 08:11 doluk

the initial error was that a KeyError removing job.id from self._running_jobs

I "fixed" that by doing self._running_jobs.discard(job.id) instead so it doesn't fail if the key is missing. Not a real fix but at least it's not crashing anymore.

Falydoor avatar Feb 05 '24 23:02 Falydoor

Can you reproduce this with the latest master? I just ran that originally posted script for half an hour without seeing any errors. The only console output I got was starting scheduler, and then some logging present in bug_demo.log, nothing else.

agronholm avatar May 11 '24 10:05 agronholm

Yes, will have it running over the next few hours

doluk avatar May 13 '24 05:05 doluk

I get no error messages, but the number of jobs is still decaying. After 15 minutes the number of regular_checks droped from 2000 to 911.

doluk avatar May 13 '24 08:05 doluk

So let me just make sure we're on the same page here. Why are you replacing the schedule from a job belonging to that schedule?

agronholm avatar May 13 '24 08:05 agronholm

I have an api need to call for a list of identifier over and over again. The time of the next call is dependent on the api response for the identifier. The function making the api calls is regular_check in the example above. Currently the schedule id is regular_check_{identifier}. and the check_id of the example is also the identifier.

With APScheduler v3 I used and sqlite jobstore and never had any issues with this way of doing it. I can see that using the same id again causes issues, but I would argue that the caused behavior is not intuitive (especially considering the used conflictpolicy)

doluk avatar May 13 '24 09:05 doluk

If you're using DateTrigger, then would it not be possible to create a totally new schedule from the job?

agronholm avatar May 13 '24 14:05 agronholm

Isn't that what I am doing in regular_checks?

doluk avatar May 13 '24 14:05 doluk

Ok, I think I didn't read everything thoroughly, so the later snippet is your solution/workaround for the original problem, correct? That would indeed work. The scheduler deletes a schedule when its trigger can no longer generate any new fire times, and this may happen while the job that was spawned from said schedule is still running. I'm wondering if perhaps it shouldn't be? Perhaps instead finished schedules should be cleaned up by the data store cleanup procedure instead which ensures that only schedules with no running jobs are deleted.

agronholm avatar May 13 '24 14:05 agronholm