[SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348)
Related Issue
closes: #50348
This PR reintroduces the pre-import optimization for DAG parsing originally added in Airflow 2.6.0 (#30495) but missing in Airflow 3.0. It reduces DAG parsing time by importing common modules in the parent process before forking.
I referred to #30495 and made modifications to 'processor.py' based on its implementation. I understand the issue is currently assigned to @kevinhongzl, and I’m happy to collaborate or adjust direction depending on their progress.
Looking forward to feedback!
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.
Also we might need to add a test for it
I’ll try adding a unit test for it.
Hi, I attempted to refactor the logic into a function.
This refactoring provides the following benefits:
- Modularity: encapsulates the pre-import logic in a dedicated function
- Easier testing: allows the _pre_import_airflow_modules function to be unit-tested in isolation
- Improved readability: the function name clearly indicates its purpose
@jedcunningham @ephraimbuddy not sure whether it's something missed or intentionally removed. But it looks good to me. I'm planning on merging it early tomorrow
I just revoke my approval.
@Lzzz666 would you be able to take a look at the review comments on this PR?
@Lzzz666 would you be able to take a look at the review comments on this PR?
@amoghrajesh I might need some help or discussion regarding the location of the pre-import!
@Lzzz666 would you be able to take a look at the review comments on this PR?
@amoghrajesh I might need some help or discussion regarding the location of the pre-import!
@Lzzz666 would you be able to take a look at the review comments on this PR?
@amoghrajesh I might need some help or discussion regarding the location of the pre-import!
Could you clarify what you mean by the “location of the pre-import”? Implementing pre-import in Airflow 2 doesn’t automatically require us to carry it over to Airflow 3. Airflow 3 uses a dedicated standalone DAG processor, whereas Airflow 2 still straddled scheduler-based and standalone dag parsing. Before moving forward, can you point out the specific issue you’re seeing or error messages?
Hi @ephraimbuddy, I’m not familiar with the scheduler, but based on the Dag File Processing documentation, I think the pre-import function is executed in the DagFileProcessorProcess stage, which runs in the parent process before forking. So I tried adding pre-import to the parse_file(), I now realize that’s not the right place.
Correct me if I’m wrong, but when I traced the code, it appears that proc: Self = super().start() fork parent process to create the child process. If the goal of the pre-import is to load modules in the parent before forking, it might make more sense to place it before the proc: Self = super().start.
DagFileProcessorManager.run()
└─ run_parsing_loop()
└─ while loop:
└─ start_new_processes()
└─ create_process(dag_path)
└─ DagFileProcessorProcess.start()
└─ proc: Self = super().start(target=target, client=client, **kwargs) //fork
Correct me if I’m wrong, but when I traced the code, it appears that
proc: Self = super().start()fork parent process to create the child process. If the goal of the pre-import is to load modules in the parent before forking, it might make more sense to place it before theproc: Self = super().start.DagFileProcessorManager.run() └─ run_parsing_loop() └─ while loop: └─ start_new_processes() └─ create_process(dag_path) └─ DagFileProcessorProcess.start() └─ proc: Self = super().start(target=target, client=client, **kwargs) //fork
Your trace is right but I feel, we should have a verifiable reason to implement the pre-import. If you can spend some time and showcase the issue, I believe it'll help all of us in finding the reason to re-implement this pre-import
I’ll refer to the experiment in #30495 and provide benchmark results.
I found that with the current pre_import implementation (#30495), the performance gains are negligible in airflow 3 —probably because it only pre-imports the Airflow modules actually used in each DAG. In theory this should still speed things up, but my benchmarks didn’t show any improvement. However, when I modified the pre_import function to preload only the “heavier” third-party libraries (NumPy, pandas, the Kubernetes, and Celery), the speed-up became very noticeable in my test dag.
so, I'm thinking that
- Why pre-importing just the present dag's Airflow modules doesn't eliminate its parse time?
- If we want to pre-import all of Airflow’s core modules, which ones should we include?
All of my tests involved parsing the same DAG ten times and measuring run_duration (which is defined as run_duration = time.monotonic() - proc.start_time), and exclude first startup cost.
-
First test with origin pre-imports method
-
Second test with origin pre-imports method
-
Modify pre-import to only include NumPy, pandas, Kubernetes, celery
I’m not entirely sure, but if we want to pre-load all the core Airflow modules—instead of, as before, pre-loading only the modules required by the current dag file before forking—should we place the pre-import function before the parse loop? This way, we can avoid redundant imports.
I’m not entirely sure, but if we want to pre-load all the core Airflow modules—instead of, as before, pre-loading only the modules required by the current dag file before forking—should we place the pre-import function before the parse loop? This way, we can avoid redundant imports.
Probably will need to benchmark again to see whether we want it. I feel it probably wouldn't affect much? Not sure
I ran a benchmark comparing three loading strategies:
- Pre-import before parsing loop
Call
_pre_import_airflow_modules()once before entering the while loop in _run_parsing_loop(). - Pre-import before fork Call it right before fork process ( original place )
- No pre-import
Setup
- 1,000 parse iterations per experiment
- Loaded almost all core Airflow modules plus numpy, pandas, celery, and k8s (no providers, api, www, cli)
- Averaged 1,000 parse (first run excluded)
- Measured
process_creation_time = time.monotonic() - process_start_time
immediately after self._start_new_processes()
Results
“Before parsing loop” vs “before fork”
- Moving the pre-import outside the parsing loop reduced
process_creation_timeby 92.8% compared to doing it before per-fork.
Baseline vs “before parsing loop”
- Without pre-import at all was about 29% faster than pre-importing before the loop, but the actual time saved was tiny—probably just normal fork timing noise.
Conclusion
If my experimental setup is correct, and pre-importing before the parse loop doesn’t introduce any unintended side effects, it might offer an opportunity to improve efficiency.
Looks good! @Lzzz666 could you please take a look at the CI failure? Thanks!
Fixed CI problems!
Hey @Lzzz666, is there anything still unresolved or that needs further discussion? Please help us resolve comments you thing have already been addressed or reach consensus 🙂 If there's none, I'm planning on merging it later today. @ephraimbuddy please let me know if you want to take another look 🙂
eladkal on May 22 Correct me if I am wrong but if import fails it means that the dag proccessor will never try to re-import it till the process is restarted. I think we need to change this. I think this also explains why in some cases (in 2.x) after dag processor restart I see many dags as broken always with import errors on my own modules.
About this problem: I'm not sure, but I tried to add an import of a non-existent module (causing a pre_import error), then removed that non-existent module. After waiting past the interval and letting the next scan run, the dag was retried and loaded successfully—no need to restart the dag processor.
If this is right, it would mean that after we fix an import error in a dag file, it should be retried in the next cycle without having to restart the dag processor.
But if this issue still occurs, should we open a new PR to address it?
I ran a benchmark comparing three loading strategies:
- Pre-import before parsing loop Call
_pre_import_airflow_modules()once before entering the while loop in _run_parsing_loop().- Pre-import before fork Call it right before fork process ( original place )
- No pre-import
Setup
- 1,000 parse iterations per experiment
- Loaded almost all core Airflow modules plus numpy, pandas, celery, and k8s (no providers, api, www, cli)
- Averaged 1,000 parse (first run excluded)
- Measured
process_creation_time = time.monotonic() - process_start_timeimmediately after
self._start_new_processes()Results
“Before parsing loop” vs “before fork”
- Moving the pre-import outside the parsing loop reduced
process_creation_timeby 92.8% compared to doing it before per-fork.Baseline vs “before parsing loop”
- Without pre-import at all was about 29% faster than pre-importing before the loop, but the actual time saved was tiny—probably just normal fork timing noise.
Conclusion
If my experimental setup is correct, and pre-importing before the parse loop doesn’t introduce any unintended side effects, it might offer an opportunity to improve efficiency.
Assuming my experimental setup is correct, should we consider whether to run pre-import prior to the parsing loop?
I tried to make an import error, then repaired the module, and the dag was retried and loaded successfully.
https://github.com/user-attachments/assets/1e18ff25-79c2-4ed5-a630-8bdd7c2c86c7
I’m going to merge this, let’s see what happens.
Backport failed to create: v3-0-test. View the failure log Run details
| Status | Branch | Result |
|---|---|---|
| ❌ | v3-0-test |
You can attempt to backport this manually by running:
cherry_picker d3bddfd v3-0-test
This should apply the commit to the v3-0-test branch and leave the commit in conflict state marking the files that need manual conflict resolution.
After you have resolved the conflicts, you can continue the backport process by running:
cherry_picker --continue
@Lzzz666 Can you help with backporting this manually?
Manual backport done in #52698