airflow
airflow copied to clipboard
when trying to mark downstream tasks as failed in the UI tasks are instead cleared and reran
Apache Airflow version
main (development)
What happened
When trying to mark downstream tasks as failed in the UI the tasks are instead cleared of any status and airflow reruns the tasks.

What you think should happen instead
When marking the tasks as either failed or success the it should set the tasks to those states. It shouldn't rerun downstream tasks if I'm setting the downstream option when marking the task as failed.
How to reproduce
Mark a task as failed with the downstream option set
Operating System
Docker (debian:buster)
Versions of Apache Airflow Providers
No response
Deployment
Other
Deployment details
happens in airflow breeze and the astro cli
Anything else
occurs everytime.
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
I guess the issue occurs that after setting state the subdags are cleared with only_failed as always True. Hence if the state being set is failed then it goes on to clear the state. So we need to make sure only_failed is False when the state being set is already failed. In addition to this the exclude_task_ids only includes the main task_id and with downstream set as True the other altered task_ids that includes downstream task ids should also be added to this list so that the state is not cleared.
I can raise a PR for this.
https://github.com/apache/airflow/blob/a801ea3927b8bf3ca154fea3774ebf2d90e74e50/airflow/models/dag.py#L1690-L1699
Related PR for more discussion : https://github.com/apache/airflow/pull/13037
cc @yuqian90 (PR author)
Another scenario is that when the task_1 being set to mark as failed is already in failed state and downstream task task_2 also marked as failure already. Then marking the state even with downstream as false clears the state. I have added test case for possible permutations. I have a local branch and below are the behaviors as per my proposed change.
with DAG("test_mark_task_instance_state_failed_downstream_clear", start_date=start_date) as dag:
task_1 = EmptyOperator(task_id="task_1")
task_2 = EmptyOperator(task_id="task_2")
task_3 = EmptyOperator(task_id="task_3")
task_1 >> task_2 >> task_3
Mark task_1 as Failed with Downstream True
| Task | Start state | End state |
|---|---|---|
| task_1 | Success | Failed |
| task_2 | Success | Failed |
| task_3 | Success | Failed |
| Task | Start state | End state |
|---|---|---|
| task_1 | Success | Failed |
| task_2 | Success | Failed |
| task_3 | Failed | Failed |
Mark task_1 as Failed with Downstream False
| Task | Start state | End state |
|---|---|---|
| task_1 | Success | Failed |
| task_2 | Success | Success |
| task_3 | Failed | None |
| Task | Start state | End state |
|---|---|---|
| task_1 | Failed | Failed |
| task_2 | Success | Success |
| task_3 | Failed | Failed |
| Task | Start state | End state |
|---|---|---|
| task_1 | Success | Failed |
| task_2 | Success | Success |
| task_3 | Success | Success |
This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.
This issue has been closed because it has not received response from the issue author.