airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Deferrable ExternalTaskSensor times out even if DAG is running

Open ecodina opened this issue 2 years ago • 17 comments

Apache Airflow version

2.7.1

What happened

If a DAG (dag1) is running and another DAG (dag2) has an ExternalTaskSensor (task-externalsensor) that checks a task on dag1, task-externalsensor will fail unless dag1's task finishes in under 60 seconds.

After checking the code, I believe the while block in the trigger should be split in two, as follows:

# Wait for the dag to start execution
# Maybe the timeout should be configurable?
while True:
    try:
        delta = utcnow() - self.trigger_start_time
        if delta.total_seconds() < self._timeout_sec:
            # mypy confuses typing here
            if await self.count_running_dags() == 0:  # type: ignore[call-arg]
                self.log.info("Waiting for DAG to start execution...")
                await asyncio.sleep(self.poll_interval)
            else:
                break
        else:
            yield TriggerEvent({"status": "timeout"})
            return
    except Exception:
        yield TriggerEvent({"status": "failed"})
        return

# Wait for the external task to finish
# Maybe out a timeout here?
while True:
    try:
        if await self.count_tasks() == len(self.execution_dates):  # type: ignore[call-arg]
            yield TriggerEvent({"status": "success"})
            return
        self.log.info("Task is still running, sleeping for %s seconds...", self.poll_interval)
        await asyncio.sleep(self.poll_interval)
    except Exception:
        yield TriggerEvent({"status": "failed"})
        return

I tested this code on my local machine and it worked as expected with dag1 running for 120 seconds.

Maybe @bkossakowska could check this out?

What you think should happen instead

No response

How to reproduce

  1. Create dag1 which contains a simple task (sleep for > 60 seconds)
  2. Create dag2 which contains an ExternalTaskSensor checking for dag1's task and runs in deferrable mode
  3. Execute them at the same time. As soon as dag2's trigger has spent 60 seconds, it will fail, even if dag1's task is still running.

Operating System

Ubuntu 22.04.3 LTS

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else

I'd love to submit a PR, but have no available time to properly do so or the knowledge to make sure the proposed solution actually works in all cases.

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

ecodina avatar Sep 08 '23 10:09 ecodina

I'd love to submit a PR, but have no available time to properly do so or the knowledge to make sure the proposed solution actually works in all cases.

then it will be waiting for someone to pick it up. But you might find it still here when you come-back ... so maybe it will be you eventually.

potiuk avatar Sep 11 '23 10:09 potiuk

I'd love to submit a PR, but have no available time to properly do so or the knowledge to make sure the proposed solution actually works in all cases.

then it will be waiting for someone to pick it up. But you might find it still here when you come-back ... so maybe it will be you eventually.

I contributed once last year to Airflow. However, I now have a job and I'm a PhD student, which is why I commented I have no available time. I spent as much time as I could debugging the issue to help whoever picks it up!

I really look forward to meeting you next week in the summit to thank you in person for all the help you've provided us the last past months!

ecodina avatar Sep 11 '23 14:09 ecodina

Hiii, can i work on this issue?

raffifu avatar Sep 13 '23 11:09 raffifu

I can reproduce the issue. I think it's because the default timeout value set to 60 seconds on the trigger. Does the trigger of ExternalTaskSensor should wait for the dependent dags to complete even it takes more than the timeout?

raffifu avatar Sep 13 '23 15:09 raffifu

I believe so. I don't think it's safe to assume how long a running DAG will take to finish (maybe it's doing I/O operations over an unreliable network). That's why I proposed to split the block: first, wait for the DAG to be running (where a timeout I feel it's appropriate) and then actually wait for the DAG to finish however long it takes.

Oh, and I appreciate you taking this issue!

ecodina avatar Sep 13 '23 17:09 ecodina

Got it, after reading the comment on the code again. Timeout only happen if the dependent task is not running in the period times. If the task is on the running state, ExternalTaskSensor must wait until the task complete. I will try the purposed solution and test on the other cases

raffifu avatar Sep 14 '23 02:09 raffifu

@potiuk is the deferrable mode should behave the same as non deferable mode on ExternalTaskSensor? Because i found that the deferrable will not fail if the external_task_id failed. I think it's because there's no checking about the state of task or dags on the run method. This error happen when i'm trying to implement the proposed solution from @ecodina. But, even if we're using the current code, it will fail because of the timeout not because the external_task failed (it will confused the user i think)

  while True:
      delta = utcnow() - self.trigger_start_time
      if delta.total_seconds() < self._timeout_sec:
          if await self.count_running_dags() == 0:  # type: ignore[call-arg]
              self.log.info("Waiting for DAG to start execution...")
              await asyncio.sleep(self.poll_interval)
          else:
              break
      else:
          yield TriggerEvent({"status": "timeout"})
          return

  # Wait Task to success
  while True:
      if await self.count_tasks() == len(self.execution_dates):  # type: ignore[call-arg]
          yield TriggerEvent({"status": "success"})
          return

      self.log.info("Task is still running, sleeping for %s seconds...", self.poll_interval)
      await asyncio.sleep(self.poll_interval)

Current code:

while True:
    delta = utcnow() - self.trigger_start_time
    if delta.total_seconds() < self._timeout_sec:
        # mypy confuses typing here
        if await self.count_running_dags() == 0:  # type: ignore[call-arg]
            self.log.info("Waiting for DAG to start execution...")
            await asyncio.sleep(self.poll_interval)
    else:
        yield TriggerEvent({"status": "timeout"}) # <---- This will triggered if the running task is suddenly failed
        return
    # mypy confuses typing here
    if await self.count_tasks() == len(self.execution_dates):  # type: ignore[call-arg]
        yield TriggerEvent({"status": "success"})
        return
    self.log.info("Task is still running, sleeping for %s seconds...", self.poll_interval)
    await asyncio.sleep(self.poll_interval)

raffifu avatar Sep 14 '23 05:09 raffifu

i took a look at the related issue #34205 and the root-cause is in the logic that differs from the non deferrable behavior. i was able to fix the very exact problem described there, but the behavior looks weird. https://github.com/apache/airflow/compare/main...yermalov-here:airflow:fix_ExternalTaskSensor_deferrable

imho this trigger is worthy of a refactoring and the non deferrable behavior should be the guideline. i've noted the following issues:

  1. the above-mentioned issue with hardcoded timeout. i think the sensor timeout value should be passed there (and the poke_interval as well)
  2. if it was the task not completing on time, the log will still say "Dag was not started within 1 minute, assuming fail.", which is confusing if you are waiting for a task or a task group, not for the full dag. see the execute_complete method of the sensor
  3. on each iteration the trigger checks whether a dag_run exists, which i don't see happening in the sensor poke

in general to me it looks like it is quite an issue, that logic needs to be duplicated for deferrable and non deferrable sensor and operator versions. it leads to them having different behavior and different sets of features. being an Airflow user I wouldn't expect behavior to differ for the same Operator/Sensor being run in two different modes.

yermalov-here avatar Sep 14 '23 10:09 yermalov-here

@yermalov-here certainly. The logic between the two codes is different and for me, both should do the same thing. As I just mentioned in #34207, I opened 3 different issues for the deferrable operator, since these were 3 completely different bugs, but I kind of agree that a refactor is needed.

I actually mentioned @bkossakowska since she's the original developer of this feature and maybe has some input on this different logic.

ecodina avatar Sep 14 '23 17:09 ecodina

Should the poke logic be shared between the TaskStateTrigger and the ExternalTaskSensor? I'm working on a change that would utilize the poke logic (which seems to be sound) in the TaskStateTrigger. I would also need to address @yermalov-here 's concerns with the execute_complete messaging and the hardcoded timeout.

samc1213 avatar Sep 15 '23 17:09 samc1213

Would anyone mind taking a look at this ( https://github.com/apache/airflow/compare/main...samc1213:airflow:external-task-deferrable ) general idea? Not ready for a deep review, but I've basically moved the poke logic into the TaskStateTrigger. From very brief testing this appears to work as intended. Wondering what everyone thinks of this.

samc1213 avatar Sep 17 '23 19:09 samc1213

Would anyone mind taking a look at this ( https://github.com/apache/airflow/compare/main...samc1213:airflow:external-task-deferrable ) general idea?

It would be nice to create PR, in this case it would be much easier review it.

Some additional info which might help with first contribution

Taragolis avatar Sep 18 '23 00:09 Taragolis

Would anyone mind taking a look at this ( main...samc1213:airflow:external-task-deferrable ) general idea?

It would be nice to create PR, in this case it would be much easier review it.

Some additional info which might help with first contribution

* [Contributor's Quick Start](https://github.com/apache/airflow/blob/main/CONTRIBUTORS_QUICK_START.rst#contributors-quick-start)

* [Breeze](https://github.com/apache/airflow/blob/main/BREEZE.rst)

* [CONTRIBUTING.rst](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)

Done, thank you for any feedback. I'm new to Airflow, but been following this issue and hope to help out.

samc1213 avatar Sep 19 '23 03:09 samc1213

I'm using Amazon Managed Workflows for Apache Airflow.

They dictate which version of python/airflow you can use and the latest version is 2.7.2, which has this bug.

Historically I have been using task sensor with mode="reschedule", which (if my understanding is correct) is no longer supported. I can't switch to use the non deferrable version, because it will block a worker which would do nothing and be very wasteful and costly.

I'm looking for options to get out of this dead end. I don't think I can monkey patch the sensor (for example the hardcoded 1min timeout in the trigger would be hard to patch). My thinking is that I should copy the latest version of the sensor/triggers in my dags and use my local version, but I'd be curious to hear any other suggestions.

0x26res avatar Feb 08 '24 09:02 0x26res

Historically I have been using task sensor with mode="reschedule", which (if my understanding is correct) is no longer supported.

It is still supported in Airflow 2 (whether it is in MWAA I have no idea, but they run generally vanilla Airflow so there is no reason to not have support for it).

potiuk avatar Feb 08 '24 10:02 potiuk

It is still supported in Airflow 2 (whether it is in MWAA I have no idea, but they run generally vanilla Airflow so there is no reason to not have support for it).

@potiuk yes I think I am wrong (I'm mixing up "Smart Sensor", which have been removed, with mode="reschedule").

I'll stick to mode="reschedule" for now then.

0x26res avatar Feb 08 '24 10:02 0x26res

In https://github.com/apache/airflow/pull/36916 WorkflowTrigger was added that has infinite loop to check for dag instead of 60 seconds in TaskStateTrigger. I guess this issue can be closed.

cc: @pankajastro

tirkarthi avatar Feb 26 '24 14:02 tirkarthi