data.gov
data.gov copied to clipboard
[SPIKE] implement job scheduler in flask app
User Story
In order to schedule harvest jobs, datagov wants to integrate a task scheduler to our existing flask app
Acceptance Criteria
[ACs should be clearly demoable/verifiable whenever possible. Try specifying them using BDD.]
Manual
- [ ] GIVEN our current flask app
AND the integration of the flask scheduler library WHEN an http request occurs on the scheduling route
THEN a job will be added to the job table
AND a harvest task will be started in cf
Automated
- [ ] GIVEN our current flask app
AND the integration of the flask scheduler library WHEN a certain date/time period has been reached
THEN a job will be added to the job table
AND a harvest task will be started in cf
Background
[Any helpful contextual notes or links to artifacts/evidence, if needed]
Security Considerations (required)
[Any security concerns that might be implicated in the change. "None" is OK, just be explicit here!]
Sketch
[Notes or a checklist reflecting our understanding of the selected approach]
https://pypi.org/project/Flask-APScheduler/ should be what we use to implement this...
sqlachemy job store for apscheduler tables. seems redundant to include our existing job
table. looks like custom metadata for jobs and tasks will be supported in v4.0. v4.0 progress track. looks like they have a couple of v4 pre-releases
maybe we can store our job results in the return value here?
# a way to start a job now
scheduler_object.get_job(job_id ="my_job_id").modify(next_run_time=datetime.datetime.now())
we're anticipating running the harvests as tasks in cloudfoundry. how does flask-apscheduler know when a job is complete if cf is doing the work? do we stream the cf task logs inside the job until the cf task status changes telling flask-apscheduler the task is complete? the executor isn't actually doing the work so what exactly is being multiprocessing/multithreaded?
here's a mockup of a flask-apscheduler job...
cf_handler = CFHandler() # cf interface
app = create_app() # flask app
scheduler = create_scheduler(app)
@scheduler.task("interval", id="do_job_1", seconds=60)
def job1(source_data):
app_guuid = "0192301923"
cf_handler.start_task( app_guuid, "python harvest.py {source_data}", "test-job" )
# ^ this will return the submitted task when it's been registered by cf.
# this job would most likely complete unless told to wait until something happens...
# keep the job open until the task status has changed to "SUCCEEDED". basic polling.
@scheduler.task("interval", id="do_job_2", seconds=60)
def job2(source_data):
app_guuid = "0192301923"
task = cf_handler.start_task( app_guuid, "python harvest.py {source_data}", "test-job" )
while task["status"] != "SUCCEEDED":
task = cf_handler.get_task( task["guuid"] )
sleep(15)
would we want to implement our own scheduling event?
implementing a custom executor
we're anticipating running the harvests as tasks in cloudfoundry. how does flask-apscheduler know when a job is complete if cf is doing the work? do we stream the cf task logs inside the job until the cf task status changes telling flask-apscheduler the task is complete? the executor isn't actually doing the work so what exactly is being multiprocessing/multithreaded?
It would be on the "job" task processor to send a post request back to the flask API, letting it know that it was done processing a job. After accepting this, other things might occur: email notification, kicking off new jobs, etc. That does require an API route being defined; that should be considered a follow up addition/feature. Tagging @btylerburton for awareness or weighing in...
Routes and order subject to change, but it's defined in Steps 23 & 24 here
https://raw.githubusercontent.com/GSA/datagov-harvesting-logic/main/docs/diagrams/mermaid/dest/etl_pipeline-1.svg
job processing workflow in apscheduler
- after the scheduler has determined that a job needs to be run the job is submitted to the thread/process pool (source)
-
run_job
runs the callable assigned to the job (source) and returns events to be dispatched afterwards. - assuming no exception occurs during job processing, after the job has completed a
JobExecutionEvent(EVENT_JOB_EXECUTED, ...)
is added to the return event list (source). the job is done WOOHOO! - within the context of the thread/process pool, this job is a
future
. when the callable of the job is done (which by this time is true) the future callback is run. this is when the events are dispatched to the scheduler (source and source)
what we want to do
- run harvest.py in cloud foundry as a task.
- when that task completes we want to post to a route in our flask app indicating the job is done. what does this look like?
harvest_source = HarvestSource( data...)
harvest_source.get_records_changes()
harvest_source.synchronize_records()
harvest_source.submit_job_complete() # as an example
#...
def submit_job_complete(self, data...):
requests.post(url, body=data) # post to our flask app
#...meanwhile, in our flask app
@mod.route('/harvest_jobs/result', methods=['POST'])
def harvest_job_result():
# here's where the job processing workflow from before comes into play...
# we need to ensure a JobExecutionEvent(EVENT_JOB_EXECUTED, ...) event is dispatched to the proper listener(s).
# this is where I have pause and things start to breakdown in a way where we're starting to call the things apscheduler is supposed to handle
my conclusion, i want to have apscheduler be responsible for its things. i don't want to call things we're not supposed to call. i could be thinking of this wrong though! any thoughts would be appreciated.
#this is where I have pause and things start to breakdown in a way where we're starting to call the things apscheduler is supposed to handle
If I'm following your question, the flask app would call the DB here for info that the task has logged. Step 27 & 28 above.
apscheduler has a datastore to keep track of tasks, jobs, and schedules. would this not replace our job table in the harvest db?
IMO the Job Table needs to exit as persistent storage for historical metrics.
Given the above questions, it feels like a good time to ask exactly what we're planning to use the scheduler for.
I have:
- tasks scheduling queue
- cron functions to trigger insertion to scheduling queue
We can delegate simple queuing to a redis instance, but it's worth listing all the things we want appscheduler to be responsible for first before we write it off.
apscheduler supports sqlalchemy for persistent storage in postgres. (as an example). i'm not sure what level of persistence this supports though ( e.g. does it retain historic job runs )
draft pr. so far i haven't been able to get the scheduler to start on container start in a non-hacky way. at this point, i've been able to get it to work as a healthcheck.
we can park this work for the time being since we're not moving forward with apscheduler.