Move dag-level callbacks to worker
We will move callbacks to workers. TBD on “how”.
Options:
- Moving callbacks to workers as a separate activity.
- Running callbacks as teardown task.
- Deprecate (not remove) concept of callback and pushing users to use teardown. If we do this and deprecate/not remove this option will happen with one of the above two options.
- Leave them in dag-processor
I am leaning towards (2) — and (3) is my next preference. This is because imo callbacks and teardowns are sort of redundant concepts, and if we keep them as is, we will have an explosion of callbacks, we already have: on_{success,failure,skipped,retry,execute}_callback .
The (2) sounds like a really good idea (not for all callbacks though). It would be great to see callbacks as implicit "tasks" in Airflow UI. Though not all callbacks can be done this way I am afraid. "on_retry" and "on execute" are particularly not fitting the "teardown" concept. The "on_execute" might be an implicit "setup" task, but "on_retry" is a bit problematic
we need to consider what happens when task succeeded but callback wasn't. We need retry for callbacks but if for example worker is terminated between task success to callback execution how would it retry?
Also on_sla_callback() and on_execute_callback() can't be part of teardown. We need to verify with AIP-86 what is the plan for sla callback as part of the deadline interface and if it conflicts with the plan we choose to take here cc @ferruzzi @romsharon98
The plan for Deadlines is to add a simple check in the scheduler loop, essentially (obviously pseudocode): if (select min(deadline) from DeadlinesTable) < utcnow(): handle_deadline_misses() My intention was for the handler to spin up a new process to run the callbacks so it isn't holding anything else up, but let me know if that plan needs to change with this initiative.
Should there be a timeout for task callbacks (like on_success_callback) in case they hang (e.g., due to API delays or infinite loops), so they don't block task completion? Something similar to execution_timeout, but specifically for callbacks. This could help avoid issues where long or broken callbacks prevent task success from being registered. WDYT?
I managed to hit this for reasons I'm unclear about under 3.0.2.
It was being raised by the DAG Processor after parsing a dag file that generates 27 DAGs from a config file (time python on the file reports sub-second runtime). I could see an error pointing me here from airflow-core/src/airflow/dag_processing/processor.py
Unfortunately this was causing all the DAGs that file generated to be dropped which was, regrettably, all of them.
After turning on debugging and much poking about I found this in the DAG processing log for the file:
{
"timestamp": "2025-07-01T00:00:10.108010",
"level": "debug",
"event": "Processing Callback Request",
"request": "{\"filepath\":\"extract.py\",\"bundle_name\":\"dags-folder\",\"bundle_version\":null,\"msg\":\"{'DAG Id': '#####', 'Task Id': '#####', 'Run Id': 'backfill__2024-12-12T00:00:00+00:00', 'Map Index': 2, 'Hostname': 'f3d38ea38a99', 'External Executor Id': '15bbb640-d659-44be-a26b-7bc2030b4b4e'}\",\"ti\":{\"id\":\"0197aefa-80b8-73cd-a588-17b61f373a76\",\"task_id\":\"'#####',\",\"dag_id\":\"'#####',\",\"run_id\":\"backfill__2024-12-12T00:00:00+00:00\",\"try_number\":1,\"map_index\":2,\"hostname\":\"f3d38ea38a99\",\"context_carrier\":{}},\"task_callback_type\":null,\"type\":\"TaskCallbackRequest\"}",
"logger": "task"
}
So the DAG processor was creating a TaskCallbackRequest for running backfill task? After shutting off all the backfills killing the task the processor worked again happily but I'm concerned I can't use backfill without triggering this as it's not clear why it happened.
I also confirm that some DAGs randomly disappear, and dag-processor errors lead to this issue. It is the main reason why we cannot move the production environment to v3. I assume that many other users also face this issue. Feel free to ping me if you need any information from the user side.
I have the same problem, but in the production environment after the update.
@opeida Could you expand please? Airflow 3.0.4 retains the same behavior as Airflow 2 where the DAG Processor runs the callbacks. (Fix in https://github.com/apache/airflow/pull/53058)
@kaxil sure! Just an important notice here: the mentioned problem was observed on 3.0.3. I will update my environment to 3.0.4 and come back with the results.
Problem description
The problem was that, after functioning correctly for a few runs, some of the DAGs randomly disappeared from the UI and weren't scheduled to run. The dag-processor failed to import them and logged the following error in their log files:
{"timestamp":"2025-07-02T00:00:43.214131Z","level":"warning","event":"Process exited abnormally","exit_code":126,"logger":"processor"}
At the same time, kubectl logs <dag-processor-pod> contained the following error:
--- Supervised process Last chance exception handler ---
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py", line 377, in _fork_main
target()
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py", line 123, in _parse_file_entrypoint
result = _parse_file(msg, log)
^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py", line 139, in _parse_file
_execute_callbacks(bag, msg.callback_requests, log)
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py", line 179, in _execute_callbacks
raise NotImplementedError(
NotImplementedError: Haven't coded Task callback yet - https://github.com/apache/airflow/issues/44354!
Little research
After some googling, I assumed the 126 error may be a permission issue but found nothing here. The files were synced with git-sync sidecars without a persistence directory. They had the following permissions/ownership in the dag-processor container:
airflow@airflow-dag-processor-6b9ddd54db-cpxf6:/opt/airflow/dags/repo/dags/clicks_daily_report$ ls -la
total 24
drwxr-sr-x 2 65533 root 4096 Jul 23 15:56 .
drwxr-sr-x 5 65533 root 4096 Jul 23 15:56 ..
-rw-r--r-- 1 65533 root 0 Jul 23 15:56 __init__.py
-rw-r--r-- 1 65533 root 2212 Jul 23 15:56 clicks_daily_report.json
-rw-r--r-- 1 65533 root 7116 Jul 23 15:56 clicks_daily_report.py
-rw-r--r-- 1 65533 root 3717 Jul 23 15:56 helper.py
The executable bits in permissions were missing but @potiuk told he doesn't believe Dag file processor uses executable bits of Dags for anything.
Environment configuration
The deployed Airflow instance was based on the official resources only: GKE cluster deployed with Helm chart v1.18.0 and an extended image based on apache/airflow:slim-3.0.3-python3.12.
Cool, so that NotImplementedError should go away with 3.0.4
Hi @kaxil ,
We faced this issue in production, thanks for the fix.
I have a clarifying question that might help others understand the bug's nature. We observed that the task failure callbacks were executed successfully for several initial runs (on v 3.0.3). If the _execute_callbacks was not implemented then how the initial DAG runs executed the callbacks correctly and then all of a sudden on one fine day the callbacks failed leading to DAG parsing failures and disappearing of the DAGs on the UI ?
Hi @kaxil , We faced this issue in production, thanks for the fix. I have a clarifying question that might help others understand the bug's nature. We observed that the task failure callbacks were executed successfully for several initial runs (on v 3.0.3). If the
_execute_callbackswas not implemented then how the initial DAG runs executed the callbacks correctly and then all of a sudden on one fine day the callbacks failed leading to DAG parsing failures and disappearing of the DAGs on the UI ?
Hi @kaxil , Can you please clarify this? Need to understand it. Thanks!
I believe option 1 should be relatively straightforward to implement once support for ExecuteCallback is added to workers, which is in progress
https://github.com/apache/airflow/blob/25202a8a703e149ffaec9790112a5b11f605c732/airflow-core/src/airflow/executors/workloads.py#L146
--- Supervised process Last chance exception handler --- Traceback (most recent call last): File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py", line 376, in _fork_main target() File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py", line 126, in _parse_file_entrypoint result = _parse_file(msg, log) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py", line 142, in _parse_file _execute_callbacks(bag, msg.callback_requests, log) File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py", line 182, in _execute_callbacks raise NotImplementedError( NotImplementedError: Haven't coded Task callback yet - https://github.com/apache/airflow/issues/44354!
I have the same problem. Will updating fix it? I'm currently using helm chart 1.18.0 with airflow 3.0.2. Will chart 1.19.0-dev with appVersion: 3.1.3 fix the problem?