airflow
airflow copied to clipboard
`on_failure_callback` defined in `default_args` don't respect lists of callbacks
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.7.3
What happened?
I have a use case where I want to pass multiple callbacks as a list to DAG's default_args dictionary. However, this list of callbacks is not supported.
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1702, in _run_finished_callback
callback(context)
File "/home/airflow/.local/lib/python3.9/site-packages/datahub_airflow_plugin/datahub_plugin_v22.py", line 238, in custom_on_failure_callback
on_failure_callback(context)
TypeError: 'list' object is not callable
I thought list of callbacks is supported, could we also support this in default_args?
What you think should happen instead?
No response
How to reproduce
Set a DAG with lists of callbacks passed in default_args
Operating System
linux
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
Could you provide minimum reproducible example, as I could see this one works fine in 2.7.3 as well as into the main branch
from datetime import datetime, timezone
from airflow import DAG
from airflow.decorators import task
from airflow.utils.task_group import TaskGroup
def callback_1(context):
print("callback_1 called")
def callback_2(context):
print("callback_2 called")
def callback_3(context):
print("callback_3 called")
START_DATE = datetime(2024, 2, 1, tzinfo=timezone.utc)
DEFAULT_ARGS_DAG = {"on_failure_callback": [callback_1, callback_2], "retires": 0}
DEFAULT_ARGS_TG = {"on_failure_callback": [callback_3, callback_2]}
with DAG(
"pr_37243",
schedule=None,
start_date=START_DATE,
default_args=DEFAULT_ARGS_DAG,
tags=["37243", "on_failure_callback", "default_args"]
):
@task(trigger_rule="all_done")
def fail_task():
1 / 0
with TaskGroup("group", default_args=DEFAULT_ARGS_TG) as tg:
fail_task()
fail_task() >> tg >> fail_task.override(on_failure_callback=[callback_3, callback_1])()
[2024-02-08, 11:22:37 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: pr_37243.fail_task manual__2024-02-08T11:22:35.523637+00:00 [queued]>
[2024-02-08, 11:22:37 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: pr_37243.fail_task manual__2024-02-08T11:22:35.523637+00:00 [queued]>
[2024-02-08, 11:22:37 UTC] {taskinstance.py:1361} INFO - Starting attempt 1 of 1
[2024-02-08, 11:22:37 UTC] {taskinstance.py:1382} INFO - Executing <Task(_PythonDecoratedOperator): fail_task> on 2024-02-08 11:22:35.523637+00:00
[2024-02-08, 11:22:37 UTC] {standard_task_runner.py:57} INFO - Started process 59 to run task
[2024-02-08, 11:22:37 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'pr_37243', 'fail_task', 'manual__2024-02-08T11:22:35.523637+00:00', '--job-id', '5', '--raw', '--subdir', 'DAGS_FOLDER/issue_37243.py', '--cfg-path', '/tmp/tmpk65t7ms3']
[2024-02-08, 11:22:37 UTC] {standard_task_runner.py:85} INFO - Job 5: Subtask fail_task
[2024-02-08, 11:22:38 UTC] {task_command.py:416} INFO - Running <TaskInstance: pr_37243.fail_task manual__2024-02-08T11:22:35.523637+00:00 [running]> on host 0f46ca170204
[2024-02-08, 11:22:38 UTC] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='pr_37243' AIRFLOW_CTX_TASK_ID='fail_task' AIRFLOW_CTX_EXECUTION_DATE='2024-02-08T11:22:35.523637+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-02-08T11:22:35.523637+00:00'
[2024-02-08, 11:22:38 UTC] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/decorators/base.py", line 221, in execute
return_value = super().execute(context)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 192, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 209, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/issue_37243.py", line 31, in fail_task
1 / 0
ZeroDivisionError: division by zero
[2024-02-08, 11:22:38 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=pr_37243, task_id=fail_task, execution_date=20240208T112235, start_date=20240208T112237, end_date=20240208T112238
[2024-02-08, 11:22:38 UTC] {logging_mixin.py:154} INFO - callback_1 called
[2024-02-08, 11:22:38 UTC] {logging_mixin.py:154} INFO - callback_2 called
[2024-02-08, 11:22:38 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 5 for task fail_task (division by zero; 59)
[2024-02-08, 11:22:38 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2024-02-08, 11:22:38 UTC] {taskinstance.py:2778} INFO - 1 downstream tasks scheduled from follow-on schedule check
[2024-02-08, 11:22:39 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: pr_37243.group.fail_task manual__2024-02-08T11:22:35.523637+00:00 [queued]>
[2024-02-08, 11:22:39 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: pr_37243.group.fail_task manual__2024-02-08T11:22:35.523637+00:00 [queued]>
[2024-02-08, 11:22:39 UTC] {taskinstance.py:1361} INFO - Starting attempt 1 of 1
[2024-02-08, 11:22:39 UTC] {taskinstance.py:1382} INFO - Executing <Task(_PythonDecoratedOperator): group.fail_task> on 2024-02-08 11:22:35.523637+00:00
[2024-02-08, 11:22:39 UTC] {standard_task_runner.py:57} INFO - Started process 62 to run task
[2024-02-08, 11:22:39 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'pr_37243', 'group.fail_task', 'manual__2024-02-08T11:22:35.523637+00:00', '--job-id', '6', '--raw', '--subdir', 'DAGS_FOLDER/issue_37243.py', '--cfg-path', '/tmp/tmpakwhoxcs']
[2024-02-08, 11:22:39 UTC] {standard_task_runner.py:85} INFO - Job 6: Subtask group.fail_task
[2024-02-08, 11:22:39 UTC] {task_command.py:416} INFO - Running <TaskInstance: pr_37243.group.fail_task manual__2024-02-08T11:22:35.523637+00:00 [running]> on host 0f46ca170204
[2024-02-08, 11:22:40 UTC] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='pr_37243' AIRFLOW_CTX_TASK_ID='group.fail_task' AIRFLOW_CTX_EXECUTION_DATE='2024-02-08T11:22:35.523637+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-02-08T11:22:35.523637+00:00'
[2024-02-08, 11:22:40 UTC] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/decorators/base.py", line 221, in execute
return_value = super().execute(context)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 192, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 209, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/issue_37243.py", line 31, in fail_task
1 / 0
ZeroDivisionError: division by zero
[2024-02-08, 11:22:40 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=pr_37243, task_id=group.fail_task, execution_date=20240208T112235, start_date=20240208T112239, end_date=20240208T112240
[2024-02-08, 11:22:40 UTC] {logging_mixin.py:154} INFO - callback_3 called
[2024-02-08, 11:22:40 UTC] {logging_mixin.py:154} INFO - callback_2 called
[2024-02-08, 11:22:40 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 6 for task group.fail_task (division by zero; 62)
[2024-02-08, 11:22:40 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2024-02-08, 11:22:40 UTC] {taskinstance.py:2778} INFO - 1 downstream tasks scheduled from follow-on schedule check
This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
This issue has been closed because it has not received response from the issue author.