airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Move dag-level callbacks to worker

Open kaxil opened this issue 1 year ago • 14 comments

We will move callbacks to workers. TBD on “how”.

Options:

  1. Moving callbacks to workers as a separate activity.
  2. Running callbacks as teardown task.
  3. Deprecate (not remove) concept of callback and pushing users to use teardown. If we do this and deprecate/not remove this option will happen with one of the above two options.
  4. Leave them in dag-processor

I am leaning towards (2) — and (3) is my next preference. This is because imo callbacks and teardowns are sort of redundant concepts, and if we keep them as is, we will have an explosion of callbacks, we already have: on_{success,failure,skipped,retry,execute}_callback .

kaxil avatar Nov 25 '24 15:11 kaxil

The (2) sounds like a really good idea (not for all callbacks though). It would be great to see callbacks as implicit "tasks" in Airflow UI. Though not all callbacks can be done this way I am afraid. "on_retry" and "on execute" are particularly not fitting the "teardown" concept. The "on_execute" might be an implicit "setup" task, but "on_retry" is a bit problematic

potiuk avatar Nov 26 '24 13:11 potiuk

we need to consider what happens when task succeeded but callback wasn't. We need retry for callbacks but if for example worker is terminated between task success to callback execution how would it retry? Also on_sla_callback() and on_execute_callback() can't be part of teardown. We need to verify with AIP-86 what is the plan for sla callback as part of the deadline interface and if it conflicts with the plan we choose to take here cc @ferruzzi @romsharon98

eladkal avatar Dec 01 '24 08:12 eladkal

The plan for Deadlines is to add a simple check in the scheduler loop, essentially (obviously pseudocode): if (select min(deadline) from DeadlinesTable) < utcnow(): handle_deadline_misses() My intention was for the handler to spin up a new process to run the callbacks so it isn't holding anything else up, but let me know if that plan needs to change with this initiative.

ferruzzi avatar Dec 03 '24 17:12 ferruzzi

Should there be a timeout for task callbacks (like on_success_callback) in case they hang (e.g., due to API delays or infinite loops), so they don't block task completion? Something similar to execution_timeout, but specifically for callbacks. This could help avoid issues where long or broken callbacks prevent task success from being registered. WDYT?

VladaZakharova avatar May 06 '25 12:05 VladaZakharova

I managed to hit this for reasons I'm unclear about under 3.0.2.

It was being raised by the DAG Processor after parsing a dag file that generates 27 DAGs from a config file (time python on the file reports sub-second runtime). I could see an error pointing me here from airflow-core/src/airflow/dag_processing/processor.py

Unfortunately this was causing all the DAGs that file generated to be dropped which was, regrettably, all of them.

After turning on debugging and much poking about I found this in the DAG processing log for the file:

{
  "timestamp": "2025-07-01T00:00:10.108010",
  "level": "debug",
  "event": "Processing Callback Request",
  "request": "{\"filepath\":\"extract.py\",\"bundle_name\":\"dags-folder\",\"bundle_version\":null,\"msg\":\"{'DAG Id': '#####', 'Task Id': '#####', 'Run Id': 'backfill__2024-12-12T00:00:00+00:00', 'Map Index': 2, 'Hostname': 'f3d38ea38a99', 'External Executor Id': '15bbb640-d659-44be-a26b-7bc2030b4b4e'}\",\"ti\":{\"id\":\"0197aefa-80b8-73cd-a588-17b61f373a76\",\"task_id\":\"'#####',\",\"dag_id\":\"'#####',\",\"run_id\":\"backfill__2024-12-12T00:00:00+00:00\",\"try_number\":1,\"map_index\":2,\"hostname\":\"f3d38ea38a99\",\"context_carrier\":{}},\"task_callback_type\":null,\"type\":\"TaskCallbackRequest\"}",
  "logger": "task"
}

So the DAG processor was creating a TaskCallbackRequest for running backfill task? After shutting off all the backfills killing the task the processor worked again happily but I'm concerned I can't use backfill without triggering this as it's not clear why it happened.

baylisscg avatar Jul 02 '25 05:07 baylisscg

I also confirm that some DAGs randomly disappear, and dag-processor errors lead to this issue. It is the main reason why we cannot move the production environment to v3. I assume that many other users also face this issue. Feel free to ping me if you need any information from the user side.

opeida avatar Aug 04 '25 20:08 opeida

I have the same problem, but in the production environment after the update.

NeOncsgo1 avatar Aug 08 '25 23:08 NeOncsgo1

@opeida Could you expand please? Airflow 3.0.4 retains the same behavior as Airflow 2 where the DAG Processor runs the callbacks. (Fix in https://github.com/apache/airflow/pull/53058)

kaxil avatar Aug 09 '25 00:08 kaxil

@kaxil sure! Just an important notice here: the mentioned problem was observed on 3.0.3. I will update my environment to 3.0.4 and come back with the results.

Problem description

The problem was that, after functioning correctly for a few runs, some of the DAGs randomly disappeared from the UI and weren't scheduled to run. The dag-processor failed to import them and logged the following error in their log files:

{"timestamp":"2025-07-02T00:00:43.214131Z","level":"warning","event":"Process exited abnormally","exit_code":126,"logger":"processor"}

At the same time, kubectl logs <dag-processor-pod> contained the following error:

--- Supervised process Last chance exception handler ---
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py", line 377, in _fork_main
    target()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py", line 123, in _parse_file_entrypoint
    result = _parse_file(msg, log)
             ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py", line 139, in _parse_file
    _execute_callbacks(bag, msg.callback_requests, log)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py", line 179, in _execute_callbacks
    raise NotImplementedError(
NotImplementedError: Haven't coded Task callback yet - https://github.com/apache/airflow/issues/44354!

Little research

After some googling, I assumed the 126 error may be a permission issue but found nothing here. The files were synced with git-sync sidecars without a persistence directory. They had the following permissions/ownership in the dag-processor container:

airflow@airflow-dag-processor-6b9ddd54db-cpxf6:/opt/airflow/dags/repo/dags/clicks_daily_report$ ls -la
total 24
drwxr-sr-x 2 65533 root 4096 Jul 23 15:56 .
drwxr-sr-x 5 65533 root 4096 Jul 23 15:56 ..
-rw-r--r-- 1 65533 root    0 Jul 23 15:56 __init__.py
-rw-r--r-- 1 65533 root 2212 Jul 23 15:56 clicks_daily_report.json
-rw-r--r-- 1 65533 root 7116 Jul 23 15:56 clicks_daily_report.py
-rw-r--r-- 1 65533 root 3717 Jul 23 15:56 helper.py

The executable bits in permissions were missing but @potiuk told he doesn't believe Dag file processor uses executable bits of Dags for anything.

Environment configuration

The deployed Airflow instance was based on the official resources only: GKE cluster deployed with Helm chart v1.18.0 and an extended image based on apache/airflow:slim-3.0.3-python3.12.

opeida avatar Aug 11 '25 10:08 opeida

Cool, so that NotImplementedError should go away with 3.0.4

kaxil avatar Aug 11 '25 11:08 kaxil

Hi @kaxil , We faced this issue in production, thanks for the fix. I have a clarifying question that might help others understand the bug's nature. We observed that the task failure callbacks were executed successfully for several initial runs (on v 3.0.3). If the _execute_callbacks was not implemented then how the initial DAG runs executed the callbacks correctly and then all of a sudden on one fine day the callbacks failed leading to DAG parsing failures and disappearing of the DAGs on the UI ?

de-fpl-user avatar Aug 13 '25 10:08 de-fpl-user

Hi @kaxil , We faced this issue in production, thanks for the fix. I have a clarifying question that might help others understand the bug's nature. We observed that the task failure callbacks were executed successfully for several initial runs (on v 3.0.3). If the _execute_callbacks was not implemented then how the initial DAG runs executed the callbacks correctly and then all of a sudden on one fine day the callbacks failed leading to DAG parsing failures and disappearing of the DAGs on the UI ?

Hi @kaxil , Can you please clarify this? Need to understand it. Thanks!

de-fpl-user avatar Aug 14 '25 08:08 de-fpl-user

I believe option 1 should be relatively straightforward to implement once support for ExecuteCallback is added to workers, which is in progress https://github.com/apache/airflow/blob/25202a8a703e149ffaec9790112a5b11f605c732/airflow-core/src/airflow/executors/workloads.py#L146

ramitkataria avatar Oct 21 '25 18:10 ramitkataria

--- Supervised process Last chance exception handler --- Traceback (most recent call last): File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py", line 376, in _fork_main target() File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py", line 126, in _parse_file_entrypoint result = _parse_file(msg, log) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py", line 142, in _parse_file _execute_callbacks(bag, msg.callback_requests, log) File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py", line 182, in _execute_callbacks raise NotImplementedError( NotImplementedError: Haven't coded Task callback yet - https://github.com/apache/airflow/issues/44354!

I have the same problem. Will updating fix it? I'm currently using helm chart 1.18.0 with airflow 3.0.2. Will chart 1.19.0-dev with appVersion: 3.1.3 fix the problem?

xkarkosx avatar Dec 10 '25 21:12 xkarkosx