apscheduler
apscheduler copied to clipboard
EVENT_JOB_SUBMITTED fires when job has not executed due to ThreadPoolExecutor(max_workers=1)
I'm finding that the EVENT_JOB_SUBMITTED
event executes when I configure a ThreadPoolExecutor
with max_workers=1
.
What I'm really trying to do is track job execution state. I want to know when a job is in one of the following states (my own nomenclature): "scheduled", "queued", "running", "completed" or "error". Since apscheduler does not provide a direct way to do this (see #332), I'm trying to use event handlers to track state of each job. This is turning out to be remarkably difficult.
By design, my application may run one of 4 different job types (job_ids), but I want only one of these jobs to run at a time. (In the future, I may expand the number/types of jobs that can run in parallel, but for now, I have set ThreadPoolExecutor
's max_workers=1
and I also set the job_defaults of coalesce=True
and max_instances=1
. In order to queue up misfired jobs for the future, I set misfire_grace_time
to a large value. This has the desired effect of queuing up all jobs behind other jobs of different types/job_ids.
Expected Behavior
Given this simple, effectively single-threaded design, I expect to be able to track state of jobs without race conditions or other issues.
Current Behavior
Unfortunately, apscheduler fails to check if the ThreadPoolExecutor
's max_workers
setting will cause a job to misfire, and EVENT_JOB_SUBMITTED
is run even if a subsequent job is misfired and re-evaluated for execution later.
Steps to Reproduce
Here are a few snippets of code that I'm able to share. Scheduler setup:
jobstores = {
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(max_workers=1)
}
job_defaults = {
'coalesce': True,
'max_instances': 1,
'misfire_grace_time': 60*60*4
}
self.scheduler = TornadoScheduler()
self.scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
self.scheduler.add_listener(self.on_job_start, apscheduler.events.EVENT_JOB_SUBMITTED)
self.scheduler.start()
This this method is fired prematurely, causing my job to be listed as 'running' when it isn't.
def on_job_start(self, event):
logging.debug('*** starting job: ' + event.job_id)
self.current_jobs[event.job_id]['status'] = 'running'
self.current_jobs[event.job_id]['started'] = time.time()
When I add a non-scheduled job, I set up the status as 'queued', but this is always overridden immediately by the event handler above.
self.current_jobs[job_id] = {'job_id':job_id, 'task':task,..., 'status':'queued'}
self.scheduler.add_job(runner,
id=job_id,
replace_existing=True,
kwargs=kwargs)
Context (Environment)
I'm unable to properly track state on my jobs without more hacks. I'm likely going to need to add a wrapper function to each of my jobs that simply updates it's state to 'running' and abandon the EVENT_JOB_SUBMITTED
event handler. If apscheduler provided a more complete API for tracking job state, then that would actually be the best situation of all.
Detailed Description
In the short term, it would be nice if EVENT_JOB_SUBMITTED
was not fired when ThreadPoolExecutor
's max_workers
limit caused a misfire. Longer term, I'd love to see a solution for #332.
import logging import time from apscheduler.schedulers.tornado import TornadoScheduler from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.events import EVENT_JOB_SUBMITTED from concurrent.futures import ThreadPoolExecutor
class JobManager: def init(self): self.current_jobs = {} self._setup_scheduler()
def _setup_scheduler(self):
jobstores = {'default': MemoryJobStore()}
executors = {'default': ThreadPoolExecutor(max_workers=1)}
job_defaults = {'coalesce': True, 'max_instances': 1, 'misfire_grace_time': 60*60*4}
self.scheduler = TornadoScheduler()
self.scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
self.scheduler.add_listener(self.on_job_start, EVENT_JOB_SUBMITTED)
self.scheduler.start()
def on_job_start(self, event):
logging.debug('*** Job submitted: ' + event.job_id)
self.current_jobs[event.job_id]['status'] = 'queued'
def job_wrapper(self, func, job_id, *args, **kwargs):
try:
self.current_jobs[job_id]['status'] = 'running'
self.current_jobs[job_id]['started'] = time.time()
result = func(*args, **kwargs)
self.current_jobs[job_id]['status'] = 'completed'
return result
except Exception as e:
self.current_jobs[job_id]['status'] = 'error'
logging.error(f"Error in job {job_id}: {e}")
# Handle the exception as needed
def add_job(self, func, job_id, *args, **kwargs):
wrapped_func = lambda: self.job_wrapper(func, job_id,