airflow
airflow copied to clipboard
`TriggerDagRunOperator` not working as expected in defer with `wait_for_completion`
Apache Airflow version
2.8.3
If "Other Airflow 2 version" selected, which one?
No response
What happened?
using the triggerdagrunoperator with wait_for_completion and deferrable show multiple bugs
for long running dags the triggerdagrunoperator sometimes never finish ( do not detect the end success or fail of triggered dag )
also the triggerdagrunoperator with reset_dag_run never finish if you clear it after a first run
What you think should happen instead?
No response
How to reproduce
triggerdagrunoperator with reset_dag_run never finish if you clear it after a first run
->
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from pendulum import today
from airflow import DAG
dag_1 = DAG(
dag_id="dag_1",
schedule_interval=None,
start_date=today("UTC").add(days=-1)
)
with dag_1:
def toto():
import time
time.sleep(20)
raise Exception()
PythonOperator(
task_id="task_2",
python_callable=toto)
dag_2 = DAG(
dag_id="dag_2",
schedule_interval=None,
start_date=today("UTC").add(days=-1)
)
with dag_2:
TriggerDagRunOperator(
task_id="task_1",
trigger_dag_id="dag_1",
reset_dag_run=True,
wait_for_completion=True,
poke_interval=5,
deferrable=True,
trigger_run_id="NAME_C"
)
trigger the dag_1 wait for end ( it will fail , that's normal ) , clear the task task_1 and the task will stay forever in deferred state
Operating System
ubuntu 22.04
Versions of Apache Airflow Providers
No response
Deployment
Docker-Compose
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
bug is still present in 2.9.0
@raphaelauv anything update?
@thanhtrung5763 No one proposed a fix yet