airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Fix DAG test command stuck in infinite loop

Open pankajastro opened this issue 1 year ago • 8 comments

closes: https://github.com/apache/airflow/issues/28865 The DAG test command stuck in infinite loop when DAG is paused

  1. task resume from deferable state: TI change from deferrable to scheduled
  2. task has retry and task fail and up for retry: TI change from running to up_for_retry

Because the current implementation call _run_task and assume the task will either fail or succeed but in case of deferrable when TI resumes again (from the trigger) and we need to run check callback that has been passed in custom trigger while initializing and deferring.

In this PR I'm storing try number for TI's to handle the retries and when handling running trigger callback in a separate loop to handle the deferrable operator.


^ Add meaningful description above

Read the Pull Request Guidelines for more information. In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed. In case of a new dependency, check compliance with the ASF 3rd Party License Policy. In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

pankajastro avatar Jun 19 '23 10:06 pankajastro

Hi @uranusjr / @hussein-awala any feedback on this would be great. Thank you!

pankajastro avatar Jun 27 '23 20:06 pankajastro

I think this change is testable, could you add a test for this?

After your review, I made some significant changes in my approach (earlier I was erroring out but now handling the deferable case). I was thinking to add tests for this case, but not if sure a unit test with mocking would test this case. I was wondering if we already have some tests where we run some dag end-to-end.

pankajastro avatar Jul 04 '23 20:07 pankajastro

I think we need a test for this

uranusjr avatar Jul 24 '23 03:07 uranusjr

I think we need a test for this

Hi @uranusjr I can see some tests for dag test feature https://github.com/apache/airflow/blob/main/tests/models/test_dag.py#L1891-L1985 but to test this particular fix I need to run a dag end to end. can you please point me if we run some dags end to end in our tests so that I can place my dag there?

pankajastro avatar Jul 24 '23 10:07 pankajastro

I think building a DAG with dag_maker and then call test would work. The test should be annotated with @pytest.mark.execution_timeout since we also want to test for the infinite loop not happening.

uranusjr avatar Jul 24 '23 11:07 uranusjr

I think building a DAG with dag_maker and then call test would work. The test should be annotated with @pytest.mark.execution_timeout since we also want to test for the infinite loop not happening.

I tried to use dag_maker and then run dag.test() but in both sync (DateTimeSensor) and async (DateTimeSensorAsync) and with dag_maker param is_paused_upon_creation={False|True} but in all these cases it timeout. not sure, if do you know if I'm missing something here? one sample test

def test_deferrable_operator(self, dag_maker, session):
        from airflow.sensors.date_time import DateTimeSensor

        # TODO: Set True
        with dag_maker(
            dag_id="test_deferrable", is_paused_upon_creation=False, start_date=DEFAULT_DATE, session=session
        ) as dag:
            DateTimeSensor(task_id="wait", target_time=datetime.datetime.now() + timedelta(seconds=1))
        dag.test()

pankajastro avatar Jul 26 '23 11:07 pankajastro

Tried to debug this. If you run this locally with the global 60s timeout disabled, the test actually passes after about 70 seconds. This is because a sensor’s default poke interval is 60 seconds, and if you ask it to wait for 1s after the current time, that’s actually how long it’d take to recheck (the extra 10-ish seconds is just overhead), and since DAG.test() is blocking by design, the test would hang for that long.

The solution would be to add e.g. poke_interval=0.1 to the sensor. (The value is arbitrary, just short enough to make things reasonable.)

uranusjr avatar Jul 28 '23 08:07 uranusjr

@pankajastro what is the status of this PR? Seems like it got stale.

eladkal avatar Feb 09 '24 10:02 eladkal

look like issue has been fixed PR https://github.com/apache/airflow/pull/34585

pankajastro avatar May 25 '24 08:05 pankajastro