airflow
airflow copied to clipboard
Add ability to disable a running DAG only after after it's in a finished state
Description
I'd like the ability to "drain" a DAG from the UI. This would mean:
- If a DAG is in a running state - disable the DAG after the run reaches a finished state (success/failed)
- If a DAG is in an idle state - disable immediately (same as what disable does right now)
Use case/motivation
Currently, if you disable a DAG, it stops the DAG midway - the running tasks are completed, and queued tasks remain on the queue. I would like a way to disable the DAG, but if there is an active Dag Run it should continue running until it reaches a finished state.
For us, this would be especially helpful when upgrading our Airflow images/environments. To my understanding, Airflow doesn't have a failure-free zero-downtime upgrade (please point me to documentation if its possible). When upgrading, we don't want to fail/kill running DAGs, so we let them reach a finished state. Once none of the DAGs are in a running state, we perform the upgrades. Precisely timing the DAG disable operation such that a subsequent run doesn't start, is not a practical/viable solution. For upgrades in the past, we have changed all the DAG schedules to None, which allows a running DAG to complete but doesn't start a subsequent run. Once upgrades are done, we update the DAG schedules back to their original value. We would like to move away from this solution, and have some built-in functionality to deal with such a case.
Related issues
No response
Are you willing to submit a PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Good idea. Would you like to work on maybe ? this is the best way to make sure it happens :)
I wonder if this should be added as a configuration for pausing/unpausing DAGs? Having a separate button for this could be confusing.
The use case as described is "Don't schedule new runs but let the current running dags to finish" - This sounds more like a scheduler feature rather than UI.
Very much so @eladkal
As a user, I'd like to have functionality in the UI that allows me to either "disable and stop the DAG immediately", or "disable DAG once finished". How to implement such a feature in the UI, such that it's not confusing is of course up for discussion - button, switch, etc. The underlying functionality to perform the "drain" will likely be need to be developed in the scheduler. So, this feature I think, falls under both scheduler, and UI development areas.
Also, @potiuk - I can try to pick this task up in a couple of weeks. Let's leave it unassigned for now, and I can let you know once I am ready to pick it up.
Sure!
I think this involves some changes to the DB schema as probably it means that is_paused
is no longer just bool of True / False but actually it has several states.
I think this involves some changes to the DB schema as probably it means that
is_paused
is no longer just bool of True / False but actually it has several states.
This is why I am partial to it being a config for the scheduler because then we can't represent is_paused
as an on/off switch.
I think this involves some changes to the DB schema as probably it means that
is_paused
is no longer just bool of True / False but actually it has several states.
Maybe a chance to solve the "Activated" "Paused" conundrum: #14459
I am happy to check this out @eladkal @potiuk ,
OPTION(1): changing "is_paused" from boolean to string( paused/unpaused/drain ), it could be a good idea in to add an additional column is_paused_state in DAG, which will allow users to keep using is_paused, but both can't be used together. ( Something in these lines )
OPTION(2): changing "is_paused" from boolean to string( paused/unpaused/drain ) could be a good idea in DAGModel, without changing it for the user in DAG() object.
Happy to hear out other suggestions.
Exploring Option(2) paused: states ( PAUSED, DRAIN) ~ not UNPAUSED unpaused: state( UNPAUSED)
Can We change schema of DAG(), is_paused should be having same datatype in both DagModel and Dag, changing it from boolean to string will also change openAPI spec and so on.
cc. @potiuk @eladkal @bbovenzi
Instead of messing with the state, I wonder if we could have an advanced version of the dag pause action. That will pause a dag after the end of the run.
We already have a toggle in the manual DAG trigger to unpause as part of triggering a run. And we already have a pause after so many failures option: https://github.com/apache/airflow/pull/36935
thanks @bbovenzi, I have found a way to still complete the DAGRun, even if the dag is paused.
Do we want it to be default behaviour or we should parameterize it with an argument in dag like 'drain_on_pause=True/False'? cc. @eladkal @potiuk @SangwanP
We need to be backward compatible but this functionality feel less about the dag code. I don't think it should be set on the code level... Pause/un-pause are action you take on the dag regardless of it's code.
yes @eladkal , the only difference will be that, earlier if we pause a dag and there are some queued/scheduled tasks they will remain in that state forever, after this change all of the tasks of a dag will be completed. ( ofcourse no further dagruns will be scheduled )
But, I thinks its better if we only do this when users wants it (when setting drain_on_pause=True ) , there might be some users who are relying on current behaviour right now.
Technical Details
- There is a query in schedule_job_runner.py where we only execute tasks for the dags which are un_paused, and I am proposing removing the condition
I am OK with having some code settings that can help limit the pausing options on specific dag (should DAG author wants it) but as a general concept I don't think it should be government by code (DAG author code).
Today you can pause/unpause at will. We should have the same thing with the new option. Effectively this means that the UI toggle should be changed from 2 mode (paused, unpaused) to 3 mode (paused, unpaused, drain tasks -> paused) where drain eventually is switched to paused when the drain is completeled.
WDYT?
I am OK with having some code settings that can help limit the pausing options on specific dag (should DAG author wants it) but as a general concept I don't think it should be government by code (DAG author code).
Today you can pause/unpause at will. We should have the same thing with the new option. Effectively this means that the UI toggle should be changed from 2 mode (paused, unpaused) to 3 mode (paused, unpaused, drain tasks -> paused) where drain eventually is switched to paused when the drain is completeled.
WDYT?
OPTION 1-
We can change type of paused from Boolean to String, but this will also change behaviour of is_paused_upon_creation, although we might add is_paused_with_drain_upon_creation as well.
OPTION 2 -
Adding another check or UI element and bind with drain_on_pause, so users can handle it from UI as well. and keeping the paused to boolean with on and off switch.
Any thoughts?
I prefer option 1.
We need to remember that pausing DAG can also be done from API/CLI and not just the UI.
We can simply deprecate is_paused_upon_creation
and replace it with new parameter to represent all possible options.
similar to what we did with K8s delete pod option https://github.com/apache/airflow/pull/30718 where we deprecated boolean option is_delete_operator_pod
(boolean) in favor of on_finish_action
(Enum)
I feel linke when going into details of database, DAG model, REST API, CLI... UI Experience this is a very complex thing. I agree that this option is useful. On the other hand scheduling logic today is already quite complex with a lot of queries. We should not risk draining performance with too many options. So taking this as a "valid use case" compared to the complexity added (also with risk of regression in scheduler performance) I'd rather opt for a global option. If the on/of must be separately evaluated per DAG, some queries in Scheduler will get really complex.
I'd propose to have a global config which changes the behavior. Either the on/of is just draining DAGs (means: Do not start new DAG runs, still schedule all tasks irrespective of DAG state) or stop scheduling tasks and DAGs.
I don't see a strong use case that in a single setup both use cases have a hard requirement to be supported in parallel.
@jscheffl The way I imagine it is that the drain status is temporary (for the duration of the drain). once drain is finished it should be switched to pause. So at any given time the amount of tasks that are in this unique state should be relatively small
I have an alternative solution; someone with a deeper knowledge of the code base can evaluate it:
If one is able to 'disable' the schedule on a DAG, the 'drain' functionality can be achieved without having to make changes to the is_paused
field.
There's already a configuration to globally disable schedules (AIRFLOW__SCHEDULER__USE_JOB_SCHEDULE), it'd be nice if this can be extended to be configurable on a per DAG basis. This field can then be set from the UI (how to do that can be discussed).
I agree with @jscheffl here that this use case doesn't warrant adding a new state with paused/unpaused/drain for DAGs as it will complicate and slow down scheduling (and rescheduling) logic. We could do this as UI-only feature, but I am personally not convinced that we need to handle this edge case. If we really really need to, I think it is better to provide single and consistent behaviour for pausing DAGs, which considers drainage, unlike today.
If one is able to 'disable' the schedule on a DAG, the 'drain' functionality can be achieved without having to make changes to the is_paused field.
@SangwanP - the use case that you started with was primarily focused on version upgrades and in that scenario, you only need this at the environment level, not at a DAG level. Moreover, in my experience, if you're doing an in-place version upgrade, any tasks that are scheduled, but not yet finished will be run again once version is upgraded. I don't think it is required for you to set all schedules to none and re-set them.
@eladkal what do we think?
- I think as suggested by @shubham22, we can make this as consistent( allowing drainage by default )
- Or by adding an argument to DAG() to set this on dag level - which will make sure it doesn't affect existing users
- additionally, by also adding an ENV variable to enable this by default.
We could do this as UI-only feature
I'll refer to my earlier comment
The use case as described is "Don't schedule new runs but let the current running dags to finish" - This sounds more like a scheduler feature rather than UI.
https://github.com/apache/airflow/issues/22006#issuecomment-1061147653
I am very happy to proceed with what was suggested. We can always improve further according to users feedback
I am looking into this flow, I have the backend changes figured out, but I am not able to create 2 popups. I might need help from a UI wizard. cc. @eladkal @bbovenzi
Also @eladkal please assign this to me.