airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Deferrable ExternalTaskSensor doesn't consider "deferred" as a running state

Open ecodina opened this issue 2 years ago • 2 comments

Apache Airflow version

2.7.1

What happened

If trying to wait for a DAG currently in a deferred state using the ExternalTaskSensor in deferrable mode, the sensor doesn't consider that the DAG is running and fails after 60 seconds.

After looking at the code, I believe it is a matter of adding deferred to the method count_running_dags:

 dags = (
               session.query(func.count("*"))
               .filter(
                        TaskInstance.dag_id == self.dag_id,
                        TaskInstance.execution_date.in_(self.execution_dates),
                        TaskInstance.state.in_([TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS, TaskInstanceState.DEFERRED]),
              )
              .scalar()
)

I tested this code on my local machine and it worked as expected when waiting for a deferred DAG.

Maybe @bkossakowska could check this out?

What you think should happen instead

No response

How to reproduce

  1. Create dag1 that contains a simple task that defers itself (e.g. TimeSensorAsync)
  2. Create dag2 which contains an ExternalTaskSensor running in deferrable mode
  3. Run at the same time and see how dag2's task fails with an airflow.exceptions.AirflowException: Dag was not started within 1 minute, assuming fail.

Operating System

Ubuntu 22.04.3 LTS

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else

Similarly to #34204 I'd love to submit a PR, but have no available time to properly do so or the knowledge to make sure the proposed solution actually works in all cases.

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

ecodina avatar Sep 08 '23 10:09 ecodina

as mentioned in #34204 i think i have a fix for this particular issue https://github.com/apache/airflow/compare/main...yermalov-here:airflow:fix_ExternalTaskSensor_deferrable

even though i don't feel confident about creating a PR with it as the fix relates to the logic of the deferrable implementation which is not part of the non deferrable implementation. deferrable implementation in the trigger checks if the dag run exists, before checking the states of task instances. this does not take place in the poke method of the sensor (non deferrable implementation).

yermalov-here avatar Sep 14 '23 14:09 yermalov-here

In https://github.com/apache/airflow/pull/36916 WorkflowTrigger was added that takes the allowed_states from ExternalTaskSensor constructor instead of TaskStateTrigger that has "success" and "failed" hardcoded in count_running_dags. I guess this issue can be closed.

cc: @pankajastro

tirkarthi avatar Feb 26 '24 14:02 tirkarthi