airflow
airflow copied to clipboard
Deferrable ExternalTaskSensor doesn't consider "deferred" as a running state
Apache Airflow version
2.7.1
What happened
If trying to wait for a DAG currently in a deferred state using the ExternalTaskSensor in deferrable mode, the sensor doesn't consider that the DAG is running and fails after 60 seconds.
After looking at the code, I believe it is a matter of adding deferred to the method count_running_dags:
dags = (
session.query(func.count("*"))
.filter(
TaskInstance.dag_id == self.dag_id,
TaskInstance.execution_date.in_(self.execution_dates),
TaskInstance.state.in_([TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS, TaskInstanceState.DEFERRED]),
)
.scalar()
)
I tested this code on my local machine and it worked as expected when waiting for a deferred DAG.
Maybe @bkossakowska could check this out?
What you think should happen instead
No response
How to reproduce
- Create
dag1that contains a simple task that defers itself (e.g. TimeSensorAsync) - Create
dag2which contains an ExternalTaskSensor running in deferrable mode - Run at the same time and see how
dag2's task fails with anairflow.exceptions.AirflowException: Dag was not started within 1 minute, assuming fail.
Operating System
Ubuntu 22.04.3 LTS
Versions of Apache Airflow Providers
No response
Deployment
Virtualenv installation
Deployment details
No response
Anything else
Similarly to #34204 I'd love to submit a PR, but have no available time to properly do so or the knowledge to make sure the proposed solution actually works in all cases.
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
as mentioned in #34204 i think i have a fix for this particular issue https://github.com/apache/airflow/compare/main...yermalov-here:airflow:fix_ExternalTaskSensor_deferrable
even though i don't feel confident about creating a PR with it as the fix relates to the logic of the deferrable implementation which is not part of the non deferrable implementation. deferrable implementation in the trigger checks if the dag run exists, before checking the states of task instances. this does not take place in the poke method of the sensor (non deferrable implementation).
In https://github.com/apache/airflow/pull/36916 WorkflowTrigger was added that takes the allowed_states from ExternalTaskSensor constructor instead of TaskStateTrigger that has "success" and "failed" hardcoded in count_running_dags. I guess this issue can be closed.
cc: @pankajastro