airflow
airflow copied to clipboard
Fix DAG test command stuck in infinite loop
closes: https://github.com/apache/airflow/issues/28865 The DAG test command stuck in infinite loop when DAG is paused
- task resume from deferable state: TI change from deferrable to scheduled
- task has retry and task fail and up for retry: TI change from running to up_for_retry
Because the current implementation call _run_task and assume the task will either fail or succeed but in case of deferrable when TI resumes again (from the trigger) and we need to run check callback that has been passed in custom trigger while initializing and deferring.
In this PR I'm storing try number for TI's to handle the retries and when handling running trigger callback in a separate loop to handle the deferrable operator.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst
or {issue_number}.significant.rst
, in newsfragments.
Hi @uranusjr / @hussein-awala any feedback on this would be great. Thank you!
I think this change is testable, could you add a test for this?
After your review, I made some significant changes in my approach (earlier I was erroring out but now handling the deferable case). I was thinking to add tests for this case, but not if sure a unit test with mocking would test this case. I was wondering if we already have some tests where we run some dag end-to-end.
I think we need a test for this
I think we need a test for this
Hi @uranusjr I can see some tests for dag test feature https://github.com/apache/airflow/blob/main/tests/models/test_dag.py#L1891-L1985 but to test this particular fix I need to run a dag end to end. can you please point me if we run some dags end to end in our tests so that I can place my dag there?
I think building a DAG with dag_maker
and then call test
would work. The test should be annotated with @pytest.mark.execution_timeout
since we also want to test for the infinite loop not happening.
I think building a DAG with
dag_maker
and then calltest
would work. The test should be annotated with@pytest.mark.execution_timeout
since we also want to test for the infinite loop not happening.
I tried to use dag_maker and then run dag.test() but in both sync (DateTimeSensor) and async (DateTimeSensorAsync) and with dag_maker param is_paused_upon_creation={False|True} but in all these cases it timeout. not sure, if do you know if I'm missing something here? one sample test
def test_deferrable_operator(self, dag_maker, session):
from airflow.sensors.date_time import DateTimeSensor
# TODO: Set True
with dag_maker(
dag_id="test_deferrable", is_paused_upon_creation=False, start_date=DEFAULT_DATE, session=session
) as dag:
DateTimeSensor(task_id="wait", target_time=datetime.datetime.now() + timedelta(seconds=1))
dag.test()
Tried to debug this. If you run this locally with the global 60s timeout disabled, the test actually passes after about 70 seconds. This is because a sensor’s default poke interval is 60 seconds, and if you ask it to wait for 1s after the current time, that’s actually how long it’d take to recheck (the extra 10-ish seconds is just overhead), and since DAG.test()
is blocking by design, the test would hang for that long.
The solution would be to add e.g. poke_interval=0.1
to the sensor. (The value is arbitrary, just short enough to make things reasonable.)
@pankajastro what is the status of this PR? Seems like it got stale.
look like issue has been fixed PR https://github.com/apache/airflow/pull/34585