airflow
airflow copied to clipboard
[AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )
This PR tries to land https://github.com/apache/airflow/pull/3584. Most changes are from @Eronarn, I rebased on master and added a few tests.
As per @Eronarn :
JIRA
AIRFLOW-249 Refactor the SLA mechanism
This is a fairly large patch that should and/or may also address:
AIRFLOW-1360 SLA miss goes to all emails in DAG AIRFLOW-2236 Airflow SLA is triggered for all backfilled tasks AIRFLOW-557 SLA notification does not work for manually triggered DAGs AIRFLOW-133 SLAs don't seem to work with schedule_interval=None AIRFLOW-1013 airflow/jobs.py:manage_slas() exception for @once dag
Description
At Quantopian we use Airflow to produce artifacts based on the previous day's stock market data. These artifacts are required for us to trade on today's stock market. Therefore, I've been investing time in improving Airflow notifications (such as writing PagerDuty and Slack integrations). My attention has turned to Airflow's SLA system, which has some drawbacks for our use case:
Defining SLAs can be awkward because they are relative to the execution date instead of the task start time. There's no way to alert if a task runs for "more than an hour", for any non-trivial DAG. Instead you can only express "more than an hour from execution date". The financial data we use varies in when it arrives, and how long it takes to process (data volume changes frequently). We also run DAGs with mixed UTC and Eastern events, making Airflow SLA definitions depend on time of year.
Execution timeouts can capture "more than an hour" but do not serve the same purpose as SLAs. We have tight timelines on long-duration tasks that make retries difficult, so we want to alert an operator while leaving the original task running, rather than failing and then alerting.
The way that SLA miss callbacks are defined is not intuitive, as in contrast to other callbacks, they are defined on the DAG rather than on the task. This has lead to a lot of confusion in JIRA/the mailing list; many people think that SLAs are for DAG completion, when in reality it's a DAG-level attribute that handles batched task-level completions. Also, the call signature is poorly defined: for instance, two of the arguments are just strings produced from the other two arguments.
SLA miss emails don't include any links back to the Airflow instance (important for us because we run the same DAGs in both staging/production) or the execution date they apply to. When opened, they be hard to read for even a moderately sized DAG because they include a flat list of task instances that are unsorted (neither alpha nor topo).
SLA miss emails are sent to every email address associated with the DAG. This can lead to inadvertent paging of users associated with unrelated "forks" in the DAG from where the SLA miss failed.
SLA emails are not callbacks, and can't be turned off (other than either removing the SLA or removing the email attribute on the task instance).
This patch attempts to address the above issues by making some of the following changes:
The sla= parameter is split into: expected_start: Timedelta after execution date, representing when this task must have started by. expected_finish: Timedelta after execution date, representing when this task must have finished by. expected_duration: Timedelta after task start, representing how long this task is expected to run, including all retries. These parameters are set on a task (or DAG-level default args), and a task can have any combination of them, though there is some basic validation logic to warn you if you try to set an illogical combination. The SlaMiss object stores the type of SLA miss as a new database field, which is a component of the primary key.
There is logic to convert the existing sla= parameter to expected_finish (as well as a migration), since that's the closest parallel, so it should be relatively backwards compatible.
SLA misses are no longer grouped for checks or callbacks. While there have always been independent per-task SlaMiss objects, there was a lot of logic to poll for all SlaMisses that occurred at the same time, and to batch them into a single email.
As a consequence of 2), The sla_miss_callback is no longer set on the DAG level, which has been confusing. It now has a context-based signature to be consistent with other task callbacks. This change is not backwards compatible for anyone using custom SLA miss callbacks, but should be a fairly straightforward conversion.
The SLA miss email is now the default SLA miss callback on tasks. Previously it was an additional non-overrideable feature.
The SLA miss email has some improvements:
- Only one SLA miss per email
- SLA-miss-specific title
- Includes a link to the task instance
- Only includes potentially-blocked downstreams
- Sends to a list of "interested subscribers", which is defined as all email addresses on tasks downstream of the task that missed its SLA.
- Additional ASCII art to help distinguish emails.
- Move the SLA miss code largely out of the Scheduler code: some into models (DAGs manage their own SlaMiss objects), and some into SLA helper functions.
Overall, attempt to reduce the complexity and lack of documentation of the SLA miss logic, given the constraint that the new implementation is a larger feature and more lines of code. The previous implementation was stuffed into one overloaded function that is responsible for checking for SLA misses, creating database objects for them, filtering tasks, selecting emails, rendering, and sending. These are now broken into multiple functions, which attempt to be more single-purpose.
Make sure to mark the boxes below before creating PR: [x]
- [x] Description above provides context of the change
- [x] Unit tests coverage for changes (not needed for documentation changes)
- [x] Target Github ISSUE in description if exists
- [x] Commits follow "How to write a good git commit message"
- [x] Relevant documentation is updated including usage instructions.
- [x] I will engage committers as explained in Contribution Workflow Example.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed. In case of a new dependency, check compliance with the ASF 3rd Party License Policy. In case of backwards incompatible changes please leave a note in UPDATING.md. Read the Pull Request Guidelines for more information.
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst) Here are some useful points:
- Pay attention to the quality of your code (flake8, pylint and type annotations). Our pre-commits will help you with that.
- In case of a new feature add useful documentation (in docstrings or in
docs/directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it. - Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
- Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
- Be sure to read the Airflow Coding style. Apache Airflow is a community-driven project and together we are making it better 🚀. In case of doubts contact the developers at: Mailing List: [email protected] Slack: https://apache-airflow-slack.herokuapp.com/
Thank you for taking this on. I couldn't justify any more time on it but I think it's still very relevant to the project.
Thank you for taking this on. I couldn't justify any more time on it but I think it's still very relevant to the project.
Happy to contribute :)
Addressed most, if not all comments in the previous round of review. Played a bit with two cases regarding how we fetch DagRuns for SLA consideration:
- Use a fixed number (e.g., 100) for fetching DRs
- Add an sla_checked column to DR and use it to filter out DRs that have already been checked.
My conclusion is that option 1 is a better trade-off, because one has to go through all TIs in a DagRun to determine if a DR can be free from further checking (e.g., if a DR has 10 TIs, then each TI has to checked for all possible SLA violations before the DR is sla_checked). This is not a cheap operation since a single TI could have 3 SLAs, hence the additional computation and IO could easily outweigh the benefit of filtering out sla_checked DRs.
My conclusion is that option 1 is a better trade-off, because one has to go through all TIs in a DagRun to determine if a DR can be free from further checking (e.g., if a DR has 10 TIs, then each TI has to checked for all possible SLA violations before the DR is sla_checked). This is not a cheap operation since a single TI could have 3 SLAs, hence the additional computation and IO could easily outweigh the benefit of filtering out sla_checked DRs.
Option 1 doesn't guarantee correctness right? i.e. if there are more dagruns that need to be checked than the preset limit, some of them will be ignored?
With regards to performance comparison between option 1 and option 2, aren't we already checking all the TIs for the 100 fetched dag runs in option 1?
Option 1 doesn't guarantee correctness right? i.e. if there are more dagruns that need to be checked than the preset limit, some of them will be ignored?
True. I guess the way to do it (if no addition column is added) would be to remove the the fixed count, and simply do
scheduled_dagruns = (
session.query(DR)
.filter(DR.dag_id == self.dag_id)
.filter(DR.run_id.notlike(f"{DagRunType.BACKFILL_JOB.value}__%"))
.filter(DR.external_trigger == False)
.filter(DR.state != State.SUCCESS)
.order_by(desc(DR.execution_date))
.all()
)
This way we only get DR that have yet to succeed (since we made an assumption that successful DRs are free from SLA check).
With regards to performance comparison between option 1 and option 2, aren't we already checking all the TIs for the 100 fetched dag runs in option 1?
We are checking whether these TIs are violating SLAs, not whether these TIs are free from SLAs, those are different checks (e.g., to check if a TI violates expected_duration, we compare the current duration with the SLA; to check if a TI is free from SLA violations, we assert on that the TI has finished within the expected_duration). To do so would require us adding another column to TI as well.
I'm slightly inclined towards option 1 (probably need to remove the 100 fixed limit), but definitely open to other opinions. :)
If we can add .filter(DR.state != State.SUCCESS) to the query filter list, then I am also in favor of option 1 (without fixed 100 limit of course). It's simpler than 2 and shouldn't run into performance issue for majority of the use-cases :)
Made requested changes, please take a look again :) @houqp @BasPH thanks
CI seems to be having trouble
unable to recognize "/opt/airflow/scripts/ci/in_container/kubernetes/app/postgres.yaml": Get https://airflow-python-3.6-v1.15.3-control-plane:6443/api?timeout=32s: dial tcp: lookup airflow-python-3.6-v1.15.3-control-plane on 127.0.0.11:53: no such host
I'll re-push but flag it here in case it's an indication that something is off.
Hi @seanxwzhang we've fixed the k8s tests can you rebase?
@BasPH does this look good to you?
Is development on task level sla miss callback active? I may be able to help
hi @seanxwzhang any updates on this patch?
hi @seanxwzhang any updates on this patch?
Unfortunately, I won't be able to continue working on this patch, happy to hand it over to others.
to take it over where should some one focus? fixing conflicts first?
to take it over where should some one focus? fixing conflicts first
I guess with how far behind this change is and how many conflicts it has, starting from the scratch following the ideas here is far better idea.
@auvipy are you currently working on this PR? If not, I'm happy to take the ideas and open up a new one that's works out of the DagFileProcessor/DagFileProcessingManager
@auvipy are you currently working on this PR? If not, I'm happy to take the ideas and open up a new one that's works out of the DagFileProcessor/DagFileProcessingManager
please go ahead. feel free to ping me for review
Hi everyone, I've spent a lot of time collecting all reported concerns that the community has had regarding SLAs to date. After much deliberation, I've reached the conclusion that we might be better off defining the Airflow-native SLA feature only at the DAG level, where it can be supported to users' expectations, and leave the task-level SLA definition to the users. There are three main reasons to why I think task-level SLAs should be implemented by the users instead of by Airflow.
- Today, users have the ability to monitor Task-level SLAs through the use of Deferrable Operators and Asynchronous DateTimeTriggers (and Task groups to organize these tasks on the UI).
- Reliably tracking task-level SLAs when the task actually misses the SLA (instead of only after the task succeeds) is only possible at the expense of overloading the work of the scheduler - which is not ideal because task-level SLA detection is not the primary function of a scheduler, and it wouldn't be beneficial for Airflow users to compromise the scheduler in any way.
- Some users want to customize the way they monitor the Task-level SLAs. Some want to use different definitions of the timedelta (timedelta from dagrun start versus from task start), some want to detect task SLA misses multiple times (different levels of warning for delays), and some users want to detect the SLA miss only if the target task is in a certain state (unfinished state - RUNNING, finished state- SUCCESS/SKIPPED)
In contrast, I believe DAG-level SLA will strictly be a positive feature. It will increase the general reliability of Airflow DAGs and even be able to alert us on job delays when undefined behaviors happen, all without negatively impacting the performance of the scheduler.
If you have been interested in the SLA mechanism, or have been actively using the current version of the SLA mechanism, I would love to get your feedback on this proposal. I would love to work with you to try to come up with an SLA solution that meets user expectations!