aircan
aircan copied to clipboard
[epic] v0.2 Error and Logging
Introducing a status API providing reporting on run status, error and logs.
Job Stories
When I run a DAG in AirCan I want to
- know its status (e.g. running, success, failed)
- (?) other info (e.g. how long its running)
- detailed errors on failure e.g. if it failed ...
- return results (or pointer to results) on success
so that I can report on this to users and empower them to resolve errors
- (?) get realtime output (cf gitlab runner)
- Get notified rather than poll for updates (push notifications rather than pull)
Acceptance
- [x] An API exists like
/api/3/action/aircan_submit?dag_id=...
that runs a DAG and returns the run ID - [x] An API exists like
/api/3/action/aircan_status?run_id=...
which reports on status of a run e.g.PENDING | RUNNING | PAUSED | FAILED | SUCCESS
and provides error information - [ ] When DAG fails error information including access to full logs (either via previous API or a new one)
- [ ] Logging - Logs are enabled on Composer and can be consumed via API. Note: There is no standard format for logging yet
- [ ] Failed End to end run test: CKAN instance with ckannext- aircan-connector; upload a CSV file and have a DAG on GCP triggered. CKAN instance must know something went wrong.
FUTURE
- [ ] Callbacks from AirCan to CKAN so rather than polling we have live status (this would be part of having "Run/Job" objects in CKAN (this is a future item)
Tasks
- [ ]
Analysis
Client flow
Thinking of user using a CKAN instance. A run of a DAG is triggered by the CKAN instance.
The user knows the name of the DAG they'll trigger (atm specified in an .env var; it can change later)
They'd then access the following endpoint to get the status of the DAG_run
GET http://ckan:500/api/3/action/dag_run/<dag_id> #Â returns all recent runs of that DAG
GET http://ckan:500/api/3/action/dag_run/<dag_id>/<run_id>
They'd see a page with the execution dates for the dag_id
Response from Airflow:
[
{
"dag_id": "ckan_api_load_gcp",
"dag_run_url": "/admin/airflow/graph?dag_id=ckan_api_load_gcp&execution_date=2020-07-09+13%3A21%3A56%2B00%3A00",
"execution_date": "2020-07-09T13:21:56+00:00",
"id": 64,
"run_id": "manual__2020-07-09T13:21:56+00:00",
"start_date": "2020-07-09T13:21:56.963772+00:00",
"state": "success"
},
{
"dag_id": "ckan_api_load_gcp",
"dag_run_url": "/admin/airflow/graph?dag_id=ckan_api_load_gcp&execution_date=2020-07-16+13%3A07%3A02%2B00%3A00",
"execution_date": "2020-07-16T13:07:02+00:00",
"id": 65,
"run_id": "manual__2020-07-16T13:07:02+00:00",
"start_date": "2020-07-16T13:07:02.100794+00:00",
"state": "failed"
},
]
The flow we'd need on CKAN you hit:
POST api/3/aircan_submit?dag_id=XXX&...
As the answer of this request, you must get back the run_id
What do you do with this run ID? [For now we can assume the client keeps that run id and it's up to them. Longer term we will have a "Run/Job" objects in CKAN] We'd need to persist it on a DB... Otherwise it'll be lost
Our customized response including access to GCP logs
Response:
{ airflow_response: {
"dag_id": "ckan_api_load_gcp",
"dag_run_url": "/admin/airflow/graph?dag_id=ckan_api_load_gcp&execution_date=2020-07-09+13%3A21%3A56%2B00%3A00",
"execution_date": "2020-07-09T13:21:56+00:00",
"id": 64,
"run_id": "manual__2020-07-09T13:21:56+00:00",
"start_date": "2020-07-09T13:21:56.963772+00:00",
"state": "success"
}
gcp_logs: {
logs for that particular run_id
}
},
{
"dag_id": "ckan_api_load_gcp",
"dag_run_url": "/admin/airflow/graph?dag_id=ckan_api_load_gcp&execution_date=2020-07-16+13%3A07%3A02%2B00%3A00",
"execution_date": "2020-07-16T13:07:02+00:00",
"id": 65,
"run_id": "manual__2020-07-16T13:07:02+00:00",
"start_date": "2020-07-16T13:07:02.100794+00:00",
"state": "failed"
},
They'd get the result of the Airflow API for DAG status https://airflow.apache.org/docs/stable/rest-api-ref.html Ideally combined with GCP logs
FAQs
- How do i get logs
- How do I get logs for a particular DAG RUN? It's not clear; on the offcial API it's not possible. Note: http://localhost:8080/admin/airflow/log?task_id=create_datastore_via_api&dag_id=ckan_api_load_gcp&execution_date=2020-07-16T15%3A32%3A52%2B00%3A00&format=json displays the logs for a specific task in a specific exec. date; we can extract them from this HTML page
- How do i get a specific dag run info if multiple running at once
- See the info we can get from the web-interface
Callbacks [Rufus: this should be later]
Another path to consider (or support both): having an endpoint set up on airflow ready to receive a post from AirCan.
i.e. a task fails while running on a DAG. Aircan sends a notification by hitting an endpoint on CKAN.
kwargs = {
resoure: ...
}
dagid = dag_run(**kwargs)
running = True
while(running) {
status = dag_status(dagid)
{
running: ...
}
}
Questions to discuss
Questions Errors Handling
- Specify on the DAG where it fails. Return "success: False" works for the logs, but we need to trigger the Fail action on the task (not being done RN)
- Treat all corner cases of failing tasks
- Shall we implement retires?
-
- Create a default error set that will be used both in the connector and on Aircan DAGs
Logs
- Planning to create the job_status page. Correct? What should we see in this page besides the task_id info + logs info?
- Obtain combined info from Airflow status API + GCloud logs when displaying task status. Sounds good?
Other questions
-
What are the endpoints (on CKAN) that will trigger the DAG? right now we have datastore_create and aircan_submit. Are there any other triggering endpoints?
-
What is the best way to organize the docs? I fing the README on aircan is extensive and potentially with lots of non-useful information. i.e. are people going to use aircan on standalone?