airflow icon indicating copy to clipboard operation
airflow copied to clipboard

`on_failure_callback` defined in `default_args` don't respect lists of callbacks

Open ammarchalifah-bolt opened this issue 1 year ago • 2 comments

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.7.3

What happened?

I have a use case where I want to pass multiple callbacks as a list to DAG's default_args dictionary. However, this list of callbacks is not supported.

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1702, in _run_finished_callback
    callback(context)
  File "/home/airflow/.local/lib/python3.9/site-packages/datahub_airflow_plugin/datahub_plugin_v22.py", line 238, in custom_on_failure_callback
    on_failure_callback(context)
TypeError: 'list' object is not callable

I thought list of callbacks is supported, could we also support this in default_args?

What you think should happen instead?

No response

How to reproduce

Set a DAG with lists of callbacks passed in default_args

Operating System

linux

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

ammarchalifah-bolt avatar Feb 08 '24 10:02 ammarchalifah-bolt

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

boring-cyborg[bot] avatar Feb 08 '24 10:02 boring-cyborg[bot]

Could you provide minimum reproducible example, as I could see this one works fine in 2.7.3 as well as into the main branch

from datetime import datetime, timezone

from airflow import DAG
from airflow.decorators import task
from airflow.utils.task_group import TaskGroup


def callback_1(context):
    print("callback_1 called")

def callback_2(context):
    print("callback_2 called")

def callback_3(context):
    print("callback_3 called")


START_DATE = datetime(2024, 2, 1, tzinfo=timezone.utc)
DEFAULT_ARGS_DAG = {"on_failure_callback": [callback_1, callback_2], "retires": 0}
DEFAULT_ARGS_TG = {"on_failure_callback": [callback_3, callback_2]}


with DAG(
    "pr_37243",
    schedule=None,
    start_date=START_DATE,
    default_args=DEFAULT_ARGS_DAG,
    tags=["37243", "on_failure_callback", "default_args"]
):
    @task(trigger_rule="all_done")
    def fail_task():
        1 / 0

    with TaskGroup("group", default_args=DEFAULT_ARGS_TG) as tg:
        fail_task()

    fail_task() >> tg >> fail_task.override(on_failure_callback=[callback_3, callback_1])()
[2024-02-08, 11:22:37 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: pr_37243.fail_task manual__2024-02-08T11:22:35.523637+00:00 [queued]>
[2024-02-08, 11:22:37 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: pr_37243.fail_task manual__2024-02-08T11:22:35.523637+00:00 [queued]>
[2024-02-08, 11:22:37 UTC] {taskinstance.py:1361} INFO - Starting attempt 1 of 1
[2024-02-08, 11:22:37 UTC] {taskinstance.py:1382} INFO - Executing <Task(_PythonDecoratedOperator): fail_task> on 2024-02-08 11:22:35.523637+00:00
[2024-02-08, 11:22:37 UTC] {standard_task_runner.py:57} INFO - Started process 59 to run task
[2024-02-08, 11:22:37 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'pr_37243', 'fail_task', 'manual__2024-02-08T11:22:35.523637+00:00', '--job-id', '5', '--raw', '--subdir', 'DAGS_FOLDER/issue_37243.py', '--cfg-path', '/tmp/tmpk65t7ms3']
[2024-02-08, 11:22:37 UTC] {standard_task_runner.py:85} INFO - Job 5: Subtask fail_task
[2024-02-08, 11:22:38 UTC] {task_command.py:416} INFO - Running <TaskInstance: pr_37243.fail_task manual__2024-02-08T11:22:35.523637+00:00 [running]> on host 0f46ca170204
[2024-02-08, 11:22:38 UTC] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='pr_37243' AIRFLOW_CTX_TASK_ID='fail_task' AIRFLOW_CTX_EXECUTION_DATE='2024-02-08T11:22:35.523637+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-02-08T11:22:35.523637+00:00'
[2024-02-08, 11:22:38 UTC] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/decorators/base.py", line 221, in execute
    return_value = super().execute(context)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 192, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 209, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/issue_37243.py", line 31, in fail_task
    1 / 0
ZeroDivisionError: division by zero
[2024-02-08, 11:22:38 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=pr_37243, task_id=fail_task, execution_date=20240208T112235, start_date=20240208T112237, end_date=20240208T112238
[2024-02-08, 11:22:38 UTC] {logging_mixin.py:154} INFO - callback_1 called
[2024-02-08, 11:22:38 UTC] {logging_mixin.py:154} INFO - callback_2 called
[2024-02-08, 11:22:38 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 5 for task fail_task (division by zero; 59)
[2024-02-08, 11:22:38 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2024-02-08, 11:22:38 UTC] {taskinstance.py:2778} INFO - 1 downstream tasks scheduled from follow-on schedule check
[2024-02-08, 11:22:39 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: pr_37243.group.fail_task manual__2024-02-08T11:22:35.523637+00:00 [queued]>
[2024-02-08, 11:22:39 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: pr_37243.group.fail_task manual__2024-02-08T11:22:35.523637+00:00 [queued]>
[2024-02-08, 11:22:39 UTC] {taskinstance.py:1361} INFO - Starting attempt 1 of 1
[2024-02-08, 11:22:39 UTC] {taskinstance.py:1382} INFO - Executing <Task(_PythonDecoratedOperator): group.fail_task> on 2024-02-08 11:22:35.523637+00:00
[2024-02-08, 11:22:39 UTC] {standard_task_runner.py:57} INFO - Started process 62 to run task
[2024-02-08, 11:22:39 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'pr_37243', 'group.fail_task', 'manual__2024-02-08T11:22:35.523637+00:00', '--job-id', '6', '--raw', '--subdir', 'DAGS_FOLDER/issue_37243.py', '--cfg-path', '/tmp/tmpakwhoxcs']
[2024-02-08, 11:22:39 UTC] {standard_task_runner.py:85} INFO - Job 6: Subtask group.fail_task
[2024-02-08, 11:22:39 UTC] {task_command.py:416} INFO - Running <TaskInstance: pr_37243.group.fail_task manual__2024-02-08T11:22:35.523637+00:00 [running]> on host 0f46ca170204
[2024-02-08, 11:22:40 UTC] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='pr_37243' AIRFLOW_CTX_TASK_ID='group.fail_task' AIRFLOW_CTX_EXECUTION_DATE='2024-02-08T11:22:35.523637+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-02-08T11:22:35.523637+00:00'
[2024-02-08, 11:22:40 UTC] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/decorators/base.py", line 221, in execute
    return_value = super().execute(context)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 192, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 209, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/issue_37243.py", line 31, in fail_task
    1 / 0
ZeroDivisionError: division by zero
[2024-02-08, 11:22:40 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=pr_37243, task_id=group.fail_task, execution_date=20240208T112235, start_date=20240208T112239, end_date=20240208T112240
[2024-02-08, 11:22:40 UTC] {logging_mixin.py:154} INFO - callback_3 called
[2024-02-08, 11:22:40 UTC] {logging_mixin.py:154} INFO - callback_2 called
[2024-02-08, 11:22:40 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 6 for task group.fail_task (division by zero; 62)
[2024-02-08, 11:22:40 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2024-02-08, 11:22:40 UTC] {taskinstance.py:2778} INFO - 1 downstream tasks scheduled from follow-on schedule check

Taragolis avatar Feb 08 '24 11:02 Taragolis

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Mar 05 '24 00:03 github-actions[bot]

This issue has been closed because it has not received response from the issue author.

github-actions[bot] avatar Mar 12 '24 00:03 github-actions[bot]