apscheduler icon indicating copy to clipboard operation
apscheduler copied to clipboard

EVENT_JOB_SUBMITTED fires when job has not executed due to ThreadPoolExecutor(max_workers=1)

Open ecbftw opened this issue 6 years ago • 1 comments

I'm finding that the EVENT_JOB_SUBMITTED event executes when I configure a ThreadPoolExecutor with max_workers=1.

What I'm really trying to do is track job execution state. I want to know when a job is in one of the following states (my own nomenclature): "scheduled", "queued", "running", "completed" or "error". Since apscheduler does not provide a direct way to do this (see #332), I'm trying to use event handlers to track state of each job. This is turning out to be remarkably difficult.

By design, my application may run one of 4 different job types (job_ids), but I want only one of these jobs to run at a time. (In the future, I may expand the number/types of jobs that can run in parallel, but for now, I have set ThreadPoolExecutor's max_workers=1 and I also set the job_defaults of coalesce=True and max_instances=1. In order to queue up misfired jobs for the future, I set misfire_grace_time to a large value. This has the desired effect of queuing up all jobs behind other jobs of different types/job_ids.

Expected Behavior

Given this simple, effectively single-threaded design, I expect to be able to track state of jobs without race conditions or other issues.

Current Behavior

Unfortunately, apscheduler fails to check if the ThreadPoolExecutor's max_workers setting will cause a job to misfire, and EVENT_JOB_SUBMITTED is run even if a subsequent job is misfired and re-evaluated for execution later.

Steps to Reproduce

Here are a few snippets of code that I'm able to share. Scheduler setup:

        jobstores = {
            'default': MemoryJobStore()
        }
        executors = {
            'default': ThreadPoolExecutor(max_workers=1)
        }
        job_defaults = {
            'coalesce': True,
            'max_instances': 1,
            'misfire_grace_time': 60*60*4
        }
        self.scheduler = TornadoScheduler()
        self.scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
        self.scheduler.add_listener(self.on_job_start, apscheduler.events.EVENT_JOB_SUBMITTED)
        self.scheduler.start()

This this method is fired prematurely, causing my job to be listed as 'running' when it isn't.

    def on_job_start(self, event):
        logging.debug('*** starting job: ' + event.job_id)
        self.current_jobs[event.job_id]['status'] = 'running'
        self.current_jobs[event.job_id]['started'] = time.time()

When I add a non-scheduled job, I set up the status as 'queued', but this is always overridden immediately by the event handler above.

        self.current_jobs[job_id] = {'job_id':job_id, 'task':task,..., 'status':'queued'}
        self.scheduler.add_job(runner,
                               id=job_id,
                               replace_existing=True,
                               kwargs=kwargs)

Context (Environment)

I'm unable to properly track state on my jobs without more hacks. I'm likely going to need to add a wrapper function to each of my jobs that simply updates it's state to 'running' and abandon the EVENT_JOB_SUBMITTED event handler. If apscheduler provided a more complete API for tracking job state, then that would actually be the best situation of all.

Detailed Description

In the short term, it would be nice if EVENT_JOB_SUBMITTED was not fired when ThreadPoolExecutor's max_workers limit caused a misfire. Longer term, I'd love to see a solution for #332.

ecbftw avatar Jan 28 '19 17:01 ecbftw

import logging import time from apscheduler.schedulers.tornado import TornadoScheduler from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.events import EVENT_JOB_SUBMITTED from concurrent.futures import ThreadPoolExecutor

class JobManager: def init(self): self.current_jobs = {} self._setup_scheduler()

def _setup_scheduler(self):
    jobstores = {'default': MemoryJobStore()}
    executors = {'default': ThreadPoolExecutor(max_workers=1)}
    job_defaults = {'coalesce': True, 'max_instances': 1, 'misfire_grace_time': 60*60*4}

    self.scheduler = TornadoScheduler()
    self.scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
    self.scheduler.add_listener(self.on_job_start, EVENT_JOB_SUBMITTED)
    self.scheduler.start()

def on_job_start(self, event):
    logging.debug('*** Job submitted: ' + event.job_id)
    self.current_jobs[event.job_id]['status'] = 'queued'

def job_wrapper(self, func, job_id, *args, **kwargs):
    try:
        self.current_jobs[job_id]['status'] = 'running'
        self.current_jobs[job_id]['started'] = time.time()
        result = func(*args, **kwargs)
        self.current_jobs[job_id]['status'] = 'completed'
        return result
    except Exception as e:
        self.current_jobs[job_id]['status'] = 'error'
        logging.error(f"Error in job {job_id}: {e}")
        # Handle the exception as needed

def add_job(self, func, job_id, *args, **kwargs):
    wrapped_func = lambda: self.job_wrapper(func, job_id,

ljluestc avatar Nov 23 '23 21:11 ljluestc