airflow icon indicating copy to clipboard operation
airflow copied to clipboard

DAG on_failure_callback uses wrong context

Open vgupta3 opened this issue 2 years ago • 15 comments

Apache Airflow version

2.4.0

What happened

When a task fails in a DAG, the on_failure_callback registered while creating the dag is triggered using the context of a random task instance.

What you think should happen instead

The expectation is that one of the task instances that caused the dag failure should be used instead of a random task instance.

How to reproduce

Run the below dag.

import datetime
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator

def all_bad():
    raise Exception("I have failed")

def all_good():
    print("ALL GOOD")

def failure_callback_dag(context):
    print("Inside failure_callback_dag")
    print(context["task_instance"])
    print(context["task"])

with DAG(
        dag_id = "test_dag",
        schedule_interval=None,
        start_date=datetime.datetime(2021, 1, 1),
        catchup=False,
        on_failure_callback=failure_callback_dag
    ) as dag:

    start = EmptyOperator(
        task_id="start"
    )
    
    fail = PythonOperator(
        provide_context = True,
        task_id = "fail",
        python_callable = all_bad
    )

    passs = PythonOperator(
        provide_context = True,
        task_id = "pass",
        python_callable = all_good
    )

    start >> [passs, fail]

From the dag processor logs:

The context that is passed is from the task instance that has succeeded.

[2022-09-28T18:28:14.465+0000] {logging_mixin.py:117} INFO - [2022-09-28T18:28:14.463+0000] {dag.py:1292} INFO - Executing dag callback function: <function failure_callback_dag at 0x7fd17ca18560>
[2022-09-28T18:28:14.943+0000] {logging_mixin.py:117} INFO - Inside failure_callback_dag
[2022-09-28T18:28:14.944+0000] {logging_mixin.py:117} INFO - <TaskInstance: test_dag.pass manual__2022-09-28T18:27:59.612118+00:00 [success]>
[2022-09-28T18:28:14.944+0000] {logging_mixin.py:117} INFO - <Task(PythonOperator): pass>

Operating System

Debian GNU/Linux 10 (buster)

Versions of Apache Airflow Providers

Default providers that are present in the official airflow docker image.

Deployment

Docker-Compose

Deployment details

No response

Anything else

Not sure if it is an expected behaviour, incase it is it needs to be documented in Callbacks.

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

vgupta3 avatar Sep 28 '22 19:09 vgupta3

Thanks for opening your first issue here! Be sure to follow the issue template!

boring-cyborg[bot] avatar Sep 28 '22 19:09 boring-cyborg[bot]

Hey @vgupta3! Have you tried applying your on_failure_callback to default_args instead? Setting on_failure_callback at the DAG level applies to the DagRun failing not tasks specifically.

on_failure_callback (DagStateChangeCallback | None) – A function to be called when a DagRun of this dag fails. A context dictionary is passed as a single parameter to this function.

If you want to gain the context from the failed task, you can set on_failure_callback at the task level or, I presume you'd like the same callback to apply to all tasks, within default_args.

on_failure_callback (TaskStateChangeCallback | None) – a function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function. Context contains references to related objects to the task instance and is documented under the macros section of the API.

And, of course, if the documentation isn't clear enough you are absolutely welcomed and encouraged to open a PR to clarify concepts!

josh-fell avatar Sep 28 '22 19:09 josh-fell

Yeah, looking at the Callbacks documentation, I definitely agree it's misleading and could be improved.

josh-fell avatar Sep 28 '22 19:09 josh-fell

Perhaps we should rename the DAG-level argument to on_dag_failure_callback instead?

uranusjr avatar Sep 29 '22 02:09 uranusjr

Yeah, looking at the Callbacks documentation, I definitely agree it's misleading and could be improved.

Actually both options are mentioned: https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/callbacks.html

eladkal avatar Sep 30 '22 21:09 eladkal

Actually both options are mentioned: https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/callbacks.html

Callbacks are set at both the DAG and task level in the code snippets, but the copy in the doc only mentions tasks. Even the callback callable of on_failure_callback=task_failure_alert set at the DAG is named in a misleading way. The callback will be triggered when the DagRun fails. Which is similar to a task failing but the context provided to the callable wouldn't necessarily be related to the task that failed. IIRC the context passed to the DAG-level callback is either the first task's context or just whatever is returned from the metadatabase first. IMO the doc can use a glow-up to help with these distinctions.

Perhaps we should rename the DAG-level argument to on_dag_failure_callback instead?

+1

josh-fell avatar Oct 01 '22 00:10 josh-fell

This has been coming up more frequently for users. I can take on related improvements here.

josh-fell avatar Feb 02 '23 14:02 josh-fell

Hello everyone. Could you let me know if there are any updates on this issue?

I had the same issue with the following DAG image

I'm creating an on_failure_callback to send events to Pagerduty. In one of my tests, I noticed that the context contains information from the first task.

Here is the callback function image

viniciusdsmello avatar Mar 12 '23 19:03 viniciusdsmello

@viniciusdsmello Where are you setting your on_failure_callback: on the DAG object, default_args, or on tasks themselves?

josh-fell avatar Mar 14 '23 13:03 josh-fell

Hi All!
Hi @josh-fell, If the same callback is applied to all tasks, I think, it would be triggered by each task failure instead of being executed only once at the DAG failure. Please share your thoughts.

If you want to gain the context from the failed task, you can set on_failure_callback at the task level or, I presume you'd like the same callback to apply to all tasks, within default_args.

It would be nice to be capable of getting the earliest failed task instance from the context or even all the task instances that failed.

Please advise.

cklimkowski avatar Apr 03 '23 19:04 cklimkowski

This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar May 03 '23 19:05 github-actions[bot]

Hi All! Hi @josh-fell, If the same callback is applied to all tasks, I think, it would be triggered by each task failure instead of being executed only once at the DAG failure. Please share your thoughts.

If you want to gain the context from the failed task, you can set on_failure_callback at the task level or, I presume you'd like the same callback to apply to all tasks, within default_args.

It would be nice to be capable of getting the earliest failed task instance from the context or even all the task instances that failed.

Please advise.

Hi @cklimkowski

One possible workaround you could use that I have employed in some of my dags to find "the earliest failed task instance from the context or even all the task instances that failed" is to use the following pattern when designing callback functions:

def failure_callback(context):
    dag_run = context.get("dag_run")
    failed_instances = dag_run.get_task_instances(state="failed")
    failed_instance_ids = [ti.task_id for ti in failed_instances]
   ...  # add further logic e.g. take first element for first failed task 

This should always get at least the first failed task instance. However, if a failed task triggers a dag failure and there are still other tasks running, and those tasks end up failing, it won't pick these up as their state will be running when the callback fires. A potential workaround (that I've not tested) if you need all failed is to implement a retry with a backoff on your callback that waits until no tasks have the state running. This could lead to complications with undead tasks though, so be sure to set some limit on number of retries if you are to do this.

will-byrne-cardano avatar Aug 10 '23 15:08 will-byrne-cardano

@viniciusdsmello I have encountered the same problem as you. I have the following dag which sends a Slack notification. In my case, task_1 throws an error, but the given context to the callback function is for task_2 (which didn't fail).

Im using GCP composer: composer-2.1.11-airflow-2.4.3 and defining the on_failure_callback at the DAG level.

My callback function:

def slack_alert(context):
    slack_msg = \
        f"""
            :x: Task Failed.
]            *Task*: {context.get('task_instance').task_id}
            *Dag*: {context.get('task_instance').dag_id}
            *Execution Time*: {context.get('execution_date')}
            <{context.get('task_instance').log_url}|*Logs*>
        """
    slk = SlackHelper()
    slk.send_message_to_channel(
        channel="#random_channel",
        text=slack_msg
    )

gabrielboehme avatar Aug 11 '23 16:08 gabrielboehme

Chiming in: what also would be helpful (IMO) in the current web docs is what context variables are available when the callback is called for the various situations, e.g

DAG level, failure DAG level, success Task level, failure Task level, success DAG level, SLA miss

I can dig through the code to find this eventually, but I think it would be helpful to include explicitly on the callback page, or at least a link to where it can be found.

If this makes sense, I could try and take a stab at a doc PR

ttzhou avatar Mar 31 '24 15:03 ttzhou

Hey @vgupta3! Have you tried applying your on_failure_callback to default_args instead? Setting on_failure_callback at the DAG level applies to the DagRun failing not tasks specifically.

on_failure_callback (DagStateChangeCallback | None) – A function to be called when a DagRun of this dag fails. A context dictionary is passed as a single parameter to this function.

@josh-fell - This doesn't appear to be happening. I have on_failure_callback set in default_args but it gets fired as soon as the task fails while the DAG is still in running state. I have also encountered instances where it fired twice for a failed DAG, one for the DAG(where the task name was wrong) and other for the task that actually failed. So I am assuming there is some issue the way callbacks are handled.

ManiBharataraju avatar May 08 '24 10:05 ManiBharataraju

@josh-fell on_failure_callback appears to pass incorrect context with postgres as backend. works as intended when using sqlite i.e with callback defined at DAG level, returns context of the failed task_instance.

Tested with Airflow V2.6.3

        Dag_id =  { context.get('task_instance').dag_id }
        Task = { context.get('task_instance').task_id } 
        Execution_date = { context.get('ds') }
        Log = { context.get('task_instance').log_url }

kyoshi-warrior avatar Jun 21 '24 04:06 kyoshi-warrior