airflow
airflow copied to clipboard
Deferrable ExternalTaskSensor times out even if DAG is running
Apache Airflow version
2.7.1
What happened
If a DAG (dag1) is running and another DAG (dag2) has an ExternalTaskSensor (task-externalsensor) that checks a task on dag1, task-externalsensor will fail unless dag1's task finishes in under 60 seconds.
After checking the code, I believe the while block in the trigger should be split in two, as follows:
# Wait for the dag to start execution
# Maybe the timeout should be configurable?
while True:
try:
delta = utcnow() - self.trigger_start_time
if delta.total_seconds() < self._timeout_sec:
# mypy confuses typing here
if await self.count_running_dags() == 0: # type: ignore[call-arg]
self.log.info("Waiting for DAG to start execution...")
await asyncio.sleep(self.poll_interval)
else:
break
else:
yield TriggerEvent({"status": "timeout"})
return
except Exception:
yield TriggerEvent({"status": "failed"})
return
# Wait for the external task to finish
# Maybe out a timeout here?
while True:
try:
if await self.count_tasks() == len(self.execution_dates): # type: ignore[call-arg]
yield TriggerEvent({"status": "success"})
return
self.log.info("Task is still running, sleeping for %s seconds...", self.poll_interval)
await asyncio.sleep(self.poll_interval)
except Exception:
yield TriggerEvent({"status": "failed"})
return
I tested this code on my local machine and it worked as expected with dag1 running for 120 seconds.
Maybe @bkossakowska could check this out?
What you think should happen instead
No response
How to reproduce
- Create
dag1which contains a simple task (sleep for > 60 seconds) - Create
dag2which contains an ExternalTaskSensor checking fordag1's task and runs in deferrable mode - Execute them at the same time. As soon as
dag2's trigger has spent 60 seconds, it will fail, even ifdag1's task is still running.
Operating System
Ubuntu 22.04.3 LTS
Versions of Apache Airflow Providers
No response
Deployment
Virtualenv installation
Deployment details
No response
Anything else
I'd love to submit a PR, but have no available time to properly do so or the knowledge to make sure the proposed solution actually works in all cases.
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
I'd love to submit a PR, but have no available time to properly do so or the knowledge to make sure the proposed solution actually works in all cases.
then it will be waiting for someone to pick it up. But you might find it still here when you come-back ... so maybe it will be you eventually.
I'd love to submit a PR, but have no available time to properly do so or the knowledge to make sure the proposed solution actually works in all cases.
then it will be waiting for someone to pick it up. But you might find it still here when you come-back ... so maybe it will be you eventually.
I contributed once last year to Airflow. However, I now have a job and I'm a PhD student, which is why I commented I have no available time. I spent as much time as I could debugging the issue to help whoever picks it up!
I really look forward to meeting you next week in the summit to thank you in person for all the help you've provided us the last past months!
Hiii, can i work on this issue?
I can reproduce the issue. I think it's because the default timeout value set to 60 seconds on the trigger. Does the trigger of ExternalTaskSensor should wait for the dependent dags to complete even it takes more than the timeout?
I believe so. I don't think it's safe to assume how long a running DAG will take to finish (maybe it's doing I/O operations over an unreliable network). That's why I proposed to split the block: first, wait for the DAG to be running (where a timeout I feel it's appropriate) and then actually wait for the DAG to finish however long it takes.
Oh, and I appreciate you taking this issue!
Got it, after reading the comment on the code again. Timeout only happen if the dependent task is not running in the period times. If the task is on the running state, ExternalTaskSensor must wait until the task complete. I will try the purposed solution and test on the other cases
@potiuk is the deferrable mode should behave the same as non deferable mode on ExternalTaskSensor? Because i found that the deferrable will not fail if the external_task_id failed. I think it's because there's no checking about the state of task or dags on the run method.
This error happen when i'm trying to implement the proposed solution from @ecodina. But, even if we're using the current code, it will fail because of the timeout not because the external_task failed (it will confused the user i think)
while True:
delta = utcnow() - self.trigger_start_time
if delta.total_seconds() < self._timeout_sec:
if await self.count_running_dags() == 0: # type: ignore[call-arg]
self.log.info("Waiting for DAG to start execution...")
await asyncio.sleep(self.poll_interval)
else:
break
else:
yield TriggerEvent({"status": "timeout"})
return
# Wait Task to success
while True:
if await self.count_tasks() == len(self.execution_dates): # type: ignore[call-arg]
yield TriggerEvent({"status": "success"})
return
self.log.info("Task is still running, sleeping for %s seconds...", self.poll_interval)
await asyncio.sleep(self.poll_interval)
Current code:
while True:
delta = utcnow() - self.trigger_start_time
if delta.total_seconds() < self._timeout_sec:
# mypy confuses typing here
if await self.count_running_dags() == 0: # type: ignore[call-arg]
self.log.info("Waiting for DAG to start execution...")
await asyncio.sleep(self.poll_interval)
else:
yield TriggerEvent({"status": "timeout"}) # <---- This will triggered if the running task is suddenly failed
return
# mypy confuses typing here
if await self.count_tasks() == len(self.execution_dates): # type: ignore[call-arg]
yield TriggerEvent({"status": "success"})
return
self.log.info("Task is still running, sleeping for %s seconds...", self.poll_interval)
await asyncio.sleep(self.poll_interval)
i took a look at the related issue #34205 and the root-cause is in the logic that differs from the non deferrable behavior. i was able to fix the very exact problem described there, but the behavior looks weird.
https://github.com/apache/airflow/compare/main...yermalov-here:airflow:fix_ExternalTaskSensor_deferrable
imho this trigger is worthy of a refactoring and the non deferrable behavior should be the guideline. i've noted the following issues:
- the above-mentioned issue with hardcoded timeout. i think the sensor
timeoutvalue should be passed there (and thepoke_intervalas well) - if it was the task not completing on time, the log will still say "Dag was not started within 1 minute, assuming fail.", which is confusing if you are waiting for a task or a task group, not for the full dag. see the
execute_completemethod of the sensor - on each iteration the trigger checks whether a dag_run exists, which i don't see happening in the sensor poke
in general to me it looks like it is quite an issue, that logic needs to be duplicated for deferrable and non deferrable sensor and operator versions. it leads to them having different behavior and different sets of features.
being an Airflow user I wouldn't expect behavior to differ for the same Operator/Sensor being run in two different modes.
@yermalov-here certainly. The logic between the two codes is different and for me, both should do the same thing. As I just mentioned in #34207, I opened 3 different issues for the deferrable operator, since these were 3 completely different bugs, but I kind of agree that a refactor is needed.
I actually mentioned @bkossakowska since she's the original developer of this feature and maybe has some input on this different logic.
Should the poke logic be shared between the TaskStateTrigger and the ExternalTaskSensor? I'm working on a change that would utilize the poke logic (which seems to be sound) in the TaskStateTrigger. I would also need to address @yermalov-here 's concerns with the execute_complete messaging and the hardcoded timeout.
Would anyone mind taking a look at this ( https://github.com/apache/airflow/compare/main...samc1213:airflow:external-task-deferrable ) general idea? Not ready for a deep review, but I've basically moved the poke logic into the TaskStateTrigger. From very brief testing this appears to work as intended. Wondering what everyone thinks of this.
Would anyone mind taking a look at this ( https://github.com/apache/airflow/compare/main...samc1213:airflow:external-task-deferrable ) general idea?
It would be nice to create PR, in this case it would be much easier review it.
Some additional info which might help with first contribution
Would anyone mind taking a look at this ( main...samc1213:airflow:external-task-deferrable ) general idea?
It would be nice to create PR, in this case it would be much easier review it.
Some additional info which might help with first contribution
* [Contributor's Quick Start](https://github.com/apache/airflow/blob/main/CONTRIBUTORS_QUICK_START.rst#contributors-quick-start) * [Breeze](https://github.com/apache/airflow/blob/main/BREEZE.rst) * [CONTRIBUTING.rst](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Done, thank you for any feedback. I'm new to Airflow, but been following this issue and hope to help out.
I'm using Amazon Managed Workflows for Apache Airflow.
They dictate which version of python/airflow you can use and the latest version is 2.7.2, which has this bug.
Historically I have been using task sensor with mode="reschedule", which (if my understanding is correct) is no longer supported. I can't switch to use the non deferrable version, because it will block a worker which would do nothing and be very wasteful and costly.
I'm looking for options to get out of this dead end. I don't think I can monkey patch the sensor (for example the hardcoded 1min timeout in the trigger would be hard to patch). My thinking is that I should copy the latest version of the sensor/triggers in my dags and use my local version, but I'd be curious to hear any other suggestions.
Historically I have been using task sensor with mode="reschedule", which (if my understanding is correct) is no longer supported.
It is still supported in Airflow 2 (whether it is in MWAA I have no idea, but they run generally vanilla Airflow so there is no reason to not have support for it).
It is still supported in Airflow 2 (whether it is in MWAA I have no idea, but they run generally vanilla Airflow so there is no reason to not have support for it).
@potiuk yes I think I am wrong (I'm mixing up "Smart Sensor", which have been removed, with mode="reschedule").
I'll stick to mode="reschedule" for now then.
In https://github.com/apache/airflow/pull/36916 WorkflowTrigger was added that has infinite loop to check for dag instead of 60 seconds in TaskStateTrigger. I guess this issue can be closed.
cc: @pankajastro