airflow
airflow copied to clipboard
DAG on_failure_callback uses wrong context
Apache Airflow version
2.4.0
What happened
When a task fails in a DAG, the on_failure_callback registered while creating the dag is triggered using the context of a random task instance.
What you think should happen instead
The expectation is that one of the task instances that caused the dag failure should be used instead of a random task instance.
How to reproduce
Run the below dag.
import datetime
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
def all_bad():
raise Exception("I have failed")
def all_good():
print("ALL GOOD")
def failure_callback_dag(context):
print("Inside failure_callback_dag")
print(context["task_instance"])
print(context["task"])
with DAG(
dag_id = "test_dag",
schedule_interval=None,
start_date=datetime.datetime(2021, 1, 1),
catchup=False,
on_failure_callback=failure_callback_dag
) as dag:
start = EmptyOperator(
task_id="start"
)
fail = PythonOperator(
provide_context = True,
task_id = "fail",
python_callable = all_bad
)
passs = PythonOperator(
provide_context = True,
task_id = "pass",
python_callable = all_good
)
start >> [passs, fail]
From the dag processor logs:
The context that is passed is from the task instance that has succeeded.
[2022-09-28T18:28:14.465+0000] {logging_mixin.py:117} INFO - [2022-09-28T18:28:14.463+0000] {dag.py:1292} INFO - Executing dag callback function: <function failure_callback_dag at 0x7fd17ca18560>
[2022-09-28T18:28:14.943+0000] {logging_mixin.py:117} INFO - Inside failure_callback_dag
[2022-09-28T18:28:14.944+0000] {logging_mixin.py:117} INFO - <TaskInstance: test_dag.pass manual__2022-09-28T18:27:59.612118+00:00 [success]>
[2022-09-28T18:28:14.944+0000] {logging_mixin.py:117} INFO - <Task(PythonOperator): pass>
Operating System
Debian GNU/Linux 10 (buster)
Versions of Apache Airflow Providers
Default providers that are present in the official airflow docker image.
Deployment
Docker-Compose
Deployment details
No response
Anything else
Not sure if it is an expected behaviour, incase it is it needs to be documented in Callbacks.
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template!
Hey @vgupta3! Have you tried applying your on_failure_callback
to default_args
instead? Setting on_failure_callback
at the DAG level applies to the DagRun failing not tasks specifically.
on_failure_callback (DagStateChangeCallback | None) – A function to be called when a DagRun of this dag fails. A context dictionary is passed as a single parameter to this function.
If you want to gain the context from the failed task, you can set on_failure_callback
at the task level or, I presume you'd like the same callback to apply to all tasks, within default_args
.
on_failure_callback (TaskStateChangeCallback | None) – a function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function. Context contains references to related objects to the task instance and is documented under the macros section of the API.
And, of course, if the documentation isn't clear enough you are absolutely welcomed and encouraged to open a PR to clarify concepts!
Yeah, looking at the Callbacks documentation, I definitely agree it's misleading and could be improved.
Perhaps we should rename the DAG-level argument to on_dag_failure_callback
instead?
Yeah, looking at the Callbacks documentation, I definitely agree it's misleading and could be improved.
Actually both options are mentioned: https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/callbacks.html
Actually both options are mentioned: https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/callbacks.html
Callbacks are set at both the DAG and task level in the code snippets, but the copy in the doc only mentions tasks. Even the callback callable of on_failure_callback=task_failure_alert
set at the DAG is named in a misleading way. The callback will be triggered when the DagRun fails. Which is similar to a task failing but the context provided to the callable wouldn't necessarily be related to the task that failed. IIRC the context passed to the DAG-level callback is either the first task's context or just whatever is returned from the metadatabase first. IMO the doc can use a glow-up to help with these distinctions.
Perhaps we should rename the DAG-level argument to on_dag_failure_callback instead?
+1
This has been coming up more frequently for users. I can take on related improvements here.
Hello everyone. Could you let me know if there are any updates on this issue?
I had the same issue with the following DAG
I'm creating an on_failure_callback to send events to Pagerduty. In one of my tests, I noticed that the context contains information from the first task.
Here is the callback function
@viniciusdsmello Where are you setting your on_failure_callback
: on the DAG object, default_args
, or on tasks themselves?
Hi All!
Hi @josh-fell,
If the same callback is applied to all tasks, I think, it would be triggered by each task failure instead of being executed only once at the DAG failure. Please share your thoughts.
If you want to gain the context from the failed task, you can set
on_failure_callback
at the task level or, I presume you'd like the same callback to apply to all tasks, withindefault_args
.
It would be nice to be capable of getting the earliest failed task instance from the context or even all the task instances that failed.
Please advise.
This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
Hi All! Hi @josh-fell, If the same callback is applied to all tasks, I think, it would be triggered by each task failure instead of being executed only once at the DAG failure. Please share your thoughts.
If you want to gain the context from the failed task, you can set
on_failure_callback
at the task level or, I presume you'd like the same callback to apply to all tasks, withindefault_args
.It would be nice to be capable of getting the earliest failed task instance from the context or even all the task instances that failed.
Please advise.
Hi @cklimkowski
One possible workaround you could use that I have employed in some of my dags to find "the earliest failed task instance from the context or even all the task instances that failed" is to use the following pattern when designing callback functions:
def failure_callback(context):
dag_run = context.get("dag_run")
failed_instances = dag_run.get_task_instances(state="failed")
failed_instance_ids = [ti.task_id for ti in failed_instances]
... # add further logic e.g. take first element for first failed task
This should always get at least the first failed task instance. However, if a failed task triggers a dag failure and there are still other tasks running, and those tasks end up failing, it won't pick these up as their state will be running
when the callback fires. A potential workaround (that I've not tested) if you need all failed is to implement a retry with a backoff on your callback that waits until no tasks have the state running
. This could lead to complications with undead tasks though, so be sure to set some limit on number of retries if you are to do this.
@viniciusdsmello I have encountered the same problem as you. I have the following dag which sends a Slack notification. In my case, task_1 throws an error, but the given context to the callback function is for task_2 (which didn't fail).
Im using GCP composer: composer-2.1.11-airflow-2.4.3 and defining the on_failure_callback at the DAG level.
My callback function:
def slack_alert(context):
slack_msg = \
f"""
:x: Task Failed.
] *Task*: {context.get('task_instance').task_id}
*Dag*: {context.get('task_instance').dag_id}
*Execution Time*: {context.get('execution_date')}
<{context.get('task_instance').log_url}|*Logs*>
"""
slk = SlackHelper()
slk.send_message_to_channel(
channel="#random_channel",
text=slack_msg
)
Chiming in: what also would be helpful (IMO) in the current web docs is what context variables are available when the callback is called for the various situations, e.g
DAG level, failure DAG level, success Task level, failure Task level, success DAG level, SLA miss
I can dig through the code to find this eventually, but I think it would be helpful to include explicitly on the callback page, or at least a link to where it can be found.
If this makes sense, I could try and take a stab at a doc PR
Hey @vgupta3! Have you tried applying your
on_failure_callback
todefault_args
instead? Settingon_failure_callback
at the DAG level applies to the DagRun failing not tasks specifically.on_failure_callback (DagStateChangeCallback | None) – A function to be called when a DagRun of this dag fails. A context dictionary is passed as a single parameter to this function.
@josh-fell - This doesn't appear to be happening. I have on_failure_callback
set in default_args
but it gets fired as soon as the task fails while the DAG is still in running state. I have also encountered instances where it fired twice for a failed DAG, one for the DAG(where the task name was wrong) and other for the task that actually failed. So I am assuming there is some issue the way callbacks are handled.
@josh-fell
on_failure_callback
appears to pass incorrect context with postgres
as backend. works as intended when using sqlite
i.e with callback defined at DAG level, returns context of the failed task_instance.
Tested with Airflow V2.6.3
Dag_id = { context.get('task_instance').dag_id }
Task = { context.get('task_instance').task_id }
Execution_date = { context.get('ds') }
Log = { context.get('task_instance').log_url }