apscheduler icon indicating copy to clipboard operation
apscheduler copied to clipboard

Stateful jobs

Open agronholm opened this issue 7 years ago • 5 comments

Certain kinds of jobs need to persist state to be passed to the next run. This obviously limits the concurrency of the job to 1 instance at the time.

agronholm avatar Dec 23 '17 11:12 agronholm

I wanted to help out and had this as a need for a project that I was working on as well. If this were to be undertaken a few issues crossed my mind with getting to the job's state from the invoked function and all the solutions that came to mind felt ugly:

  • Require invoked function to have a parameter state that we could populate with the job state. Here the user is forced to add a parameter to their function of a certain name if they wanted to leverage this functionality.
  • Define some global functions that can get the state, but we then need to get the job id in the function so they can be invoked from there.
  • Dynamically insert some functions into the user-defined function before it's execution. I am not even sure how this would be done or if it is possible.

fredthomsen avatar Apr 02 '18 05:04 fredthomsen

So how to pass the former state to next scheduled job?

alexanderhawl avatar Dec 20 '20 14:12 alexanderhawl

This is an open issue which means that the feature is not done. It has also been marked for the 4.0 milestone, which is where you can expect this to be done.

agronholm avatar Dec 20 '20 17:12 agronholm

I'm stuck with an issue I thought somehow I was causing using the wrong scheduler setup, but it could be related to this one and could be partially answered here.

I guess my use case is very simple: keeping the state of the arguments for the job's target function. I'm trying to create a very simple job like this, which using the default memory backend will successfully print argument id and contents (each time of the count value increased by 1).

# Job's target function
def inc(_obj: Dict):
    print(f"id={id(_obj)}, value={_obj}")
    _obj["count"] += 1

# Scheduler setup with defaults
scheduler = BackgroundScheduler(daemon=True)

# Object argument to pass to the target function
task_args = {"count": 0}

# Create the job
job: Job = scheduler.add_job(
      inc,
      trigger="interval",
      args=[task_args],
      name="test_job",
      **{"seconds": 3},
  )

By just switching the jobstore to mongo I lose the state which I thought it'd be kept by means of the mongodb jobstore:

   def update_job(self, job):
        changes = {
            'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
            'job_state': Binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
        }
        result = self.collection.update_one({'_id': job.id}, {'$set': changes})

Maybe this is by design, and the point to have stateful jobs in v4 as mentioned will cover this use case. But it seems like mongodb implements job updates in the db (which could be restricted to next_run_time, and that would justify the job "statelessness" I'm experiencing in my tests, but update_job pickles and save the entire job state, including job arguments).

dariosm avatar Aug 23 '21 02:08 dariosm

I've implemented stateful jobs by simply installing a listener on EVENT_JOB_EXECUTED that reschedules the job with a new parameter value. There's also a modify function in Job class, that should work too. All you need is a protocol to pass the last result back to the function. I just use a parameter last_result: Optional[T]

rafalkrupinski avatar Jun 09 '23 12:06 rafalkrupinski