airflow icon indicating copy to clipboard operation
airflow copied to clipboard

[SCHEDULER] Add back DAG parsing pre-import optimization in Airflow 3.0 (Addresses #50348)

Open Lzzz666 opened this issue 8 months ago • 15 comments

Related Issue

closes: #50348


This PR reintroduces the pre-import optimization for DAG parsing originally added in Airflow 2.6.0 (#30495) but missing in Airflow 3.0. It reduces DAG parsing time by importing common modules in the parent process before forking.

I referred to #30495 and made modifications to 'processor.py' based on its implementation. I understand the issue is currently assigned to @kevinhongzl, and I’m happy to collaborate or adjust direction depending on their progress.

Looking forward to feedback!


^ Add meaningful description above Read the Pull Request Guidelines for more information. In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed. In case of a new dependency, check compliance with the ASF 3rd Party License Policy. In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

Lzzz666 avatar May 08 '25 19:05 Lzzz666

Also we might need to add a test for it

I’ll try adding a unit test for it.

Lzzz666 avatar May 13 '25 06:05 Lzzz666

Hi, I attempted to refactor the logic into a function.

This refactoring provides the following benefits:

  1. Modularity: encapsulates the pre-import logic in a dedicated function
  2. Easier testing: allows the _pre_import_airflow_modules function to be unit-tested in isolation
  3. Improved readability: the function name clearly indicates its purpose

Lzzz666 avatar May 15 '25 10:05 Lzzz666

@jedcunningham @ephraimbuddy not sure whether it's something missed or intentionally removed. But it looks good to me. I'm planning on merging it early tomorrow

Lee-W avatar May 19 '25 08:05 Lee-W

I just revoke my approval.

Lee-W avatar May 19 '25 14:05 Lee-W

@Lzzz666 would you be able to take a look at the review comments on this PR?

amoghrajesh avatar Jun 04 '25 06:06 amoghrajesh

@Lzzz666 would you be able to take a look at the review comments on this PR?

@amoghrajesh I might need some help or discussion regarding the location of the pre-import!

Lzzz666 avatar Jun 10 '25 08:06 Lzzz666

@Lzzz666 would you be able to take a look at the review comments on this PR?

@amoghrajesh I might need some help or discussion regarding the location of the pre-import!

@Lzzz666 would you be able to take a look at the review comments on this PR?

@amoghrajesh I might need some help or discussion regarding the location of the pre-import!

Could you clarify what you mean by the “location of the pre-import”? Implementing pre-import in Airflow 2 doesn’t automatically require us to carry it over to Airflow 3. Airflow 3 uses a dedicated standalone DAG processor, whereas Airflow 2 still straddled scheduler-based and standalone dag parsing. Before moving forward, can you point out the specific issue you’re seeing or error messages?

ephraimbuddy avatar Jun 11 '25 19:06 ephraimbuddy

Hi @ephraimbuddy, I’m not familiar with the scheduler, but based on the Dag File Processing documentation, I think the pre-import function is executed in the DagFileProcessorProcess stage, which runs in the parent process before forking. So I tried adding pre-import to the parse_file(), I now realize that’s not the right place.

Lzzz666 avatar Jun 13 '25 03:06 Lzzz666

Correct me if I’m wrong, but when I traced the code, it appears that proc: Self = super().start() fork parent process to create the child process. If the goal of the pre-import is to load modules in the parent before forking, it might make more sense to place it before the proc: Self = super().start.

DagFileProcessorManager.run()
└─ run_parsing_loop()
   └─ while loop:
      └─ start_new_processes()
            └─ create_process(dag_path) 
               └─ DagFileProcessorProcess.start()
                   └─ proc: Self = super().start(target=target, client=client, **kwargs) //fork

Lzzz666 avatar Jun 13 '25 07:06 Lzzz666

Correct me if I’m wrong, but when I traced the code, it appears that proc: Self = super().start() fork parent process to create the child process. If the goal of the pre-import is to load modules in the parent before forking, it might make more sense to place it before the proc: Self = super().start.

DagFileProcessorManager.run()
└─ run_parsing_loop()
   └─ while loop:
      └─ start_new_processes()
            └─ create_process(dag_path) 
               └─ DagFileProcessorProcess.start()
                   └─ proc: Self = super().start(target=target, client=client, **kwargs) //fork

Your trace is right but I feel, we should have a verifiable reason to implement the pre-import. If you can spend some time and showcase the issue, I believe it'll help all of us in finding the reason to re-implement this pre-import

ephraimbuddy avatar Jun 13 '25 09:06 ephraimbuddy

I’ll refer to the experiment in #30495 and provide benchmark results.

Lzzz666 avatar Jun 13 '25 18:06 Lzzz666

I found that with the current pre_import implementation (#30495), the performance gains are negligible in airflow 3 —probably because it only pre-imports the Airflow modules actually used in each DAG. In theory this should still speed things up, but my benchmarks didn’t show any improvement. However, when I modified the pre_import function to preload only the “heavier” third-party libraries (NumPy, pandas, the Kubernetes, and Celery), the speed-up became very noticeable in my test dag.

so, I'm thinking that

  1. Why pre-importing just the present dag's Airflow modules doesn't eliminate its parse time?
  2. If we want to pre-import all of Airflow’s core modules, which ones should we include?

All of my tests involved parsing the same DAG ten times and measuring run_duration (which is defined as run_duration = time.monotonic() - proc.start_time), and exclude first startup cost.

  1. First test with origin pre-imports method image

  2. Second test with origin pre-imports method image

  3. Modify pre-import to only include NumPy, pandas, Kubernetes, celery image

Lzzz666 avatar Jun 16 '25 04:06 Lzzz666

I’m not entirely sure, but if we want to pre-load all the core Airflow modules—instead of, as before, pre-loading only the modules required by the current dag file before forking—should we place the pre-import function before the parse loop? This way, we can avoid redundant imports.

Lzzz666 avatar Jun 16 '25 15:06 Lzzz666

I’m not entirely sure, but if we want to pre-load all the core Airflow modules—instead of, as before, pre-loading only the modules required by the current dag file before forking—should we place the pre-import function before the parse loop? This way, we can avoid redundant imports.

Probably will need to benchmark again to see whether we want it. I feel it probably wouldn't affect much? Not sure

Lee-W avatar Jun 17 '25 02:06 Lee-W

I ran a benchmark comparing three loading strategies:

  1. Pre-import before parsing loop Call _pre_import_airflow_modules() once before entering the while loop in _run_parsing_loop().
  2. Pre-import before fork Call it right before fork process ( original place )
  3. No pre-import

Setup

  • 1,000 parse iterations per experiment
  • Loaded almost all core Airflow modules plus numpy, pandas, celery, and k8s (no providers, api, www, cli)
  • Averaged 1,000 parse (first run excluded)
  • Measured
process_creation_time = time.monotonic() - process_start_time

immediately after self._start_new_processes()

Results

“Before parsing loop” vs “before fork”

  • Moving the pre-import outside the parsing loop reduced process_creation_time by 92.8% compared to doing it before per-fork.

Baseline vs “before parsing loop”

  • Without pre-import at all was about 29% faster than pre-importing before the loop, but the actual time saved was tiny—probably just normal fork timing noise.

image

Conclusion

If my experimental setup is correct, and pre-importing before the parse loop doesn’t introduce any unintended side effects, it might offer an opportunity to improve efficiency.

Lzzz666 avatar Jun 17 '25 08:06 Lzzz666

Looks good! @Lzzz666 could you please take a look at the CI failure? Thanks!

Lee-W avatar Jun 19 '25 03:06 Lee-W

Fixed CI problems!

Lzzz666 avatar Jun 19 '25 07:06 Lzzz666

Hey @Lzzz666, is there anything still unresolved or that needs further discussion? Please help us resolve comments you thing have already been addressed or reach consensus 🙂 If there's none, I'm planning on merging it later today. @ephraimbuddy please let me know if you want to take another look 🙂

Lee-W avatar Jun 24 '25 07:06 Lee-W

eladkal on May 22 Correct me if I am wrong but if import fails it means that the dag proccessor will never try to re-import it till the process is restarted. I think we need to change this. I think this also explains why in some cases (in 2.x) after dag processor restart I see many dags as broken always with import errors on my own modules.

About this problem: I'm not sure, but I tried to add an import of a non-existent module (causing a pre_import error), then removed that non-existent module. After waiting past the interval and letting the next scan run, the dag was retried and loaded successfully—no need to restart the dag processor.

If this is right, it would mean that after we fix an import error in a dag file, it should be retried in the next cycle without having to restart the dag processor.

But if this issue still occurs, should we open a new PR to address it?

Lzzz666 avatar Jun 24 '25 07:06 Lzzz666

I ran a benchmark comparing three loading strategies:

  1. Pre-import before parsing loop Call _pre_import_airflow_modules() once before entering the while loop in _run_parsing_loop().
  2. Pre-import before fork Call it right before fork process ( original place )
  3. No pre-import

Setup

  • 1,000 parse iterations per experiment
  • Loaded almost all core Airflow modules plus numpy, pandas, celery, and k8s (no providers, api, www, cli)
  • Averaged 1,000 parse (first run excluded)
  • Measured
process_creation_time = time.monotonic() - process_start_time

immediately after self._start_new_processes()

Results

“Before parsing loop” vs “before fork”

  • Moving the pre-import outside the parsing loop reduced process_creation_time by 92.8% compared to doing it before per-fork.

Baseline vs “before parsing loop”

  • Without pre-import at all was about 29% faster than pre-importing before the loop, but the actual time saved was tiny—probably just normal fork timing noise.

image

Conclusion

If my experimental setup is correct, and pre-importing before the parse loop doesn’t introduce any unintended side effects, it might offer an opportunity to improve efficiency.

Assuming my experimental setup is correct, should we consider whether to run pre-import prior to the parsing loop?

Lzzz666 avatar Jun 24 '25 07:06 Lzzz666

I tried to make an import error, then repaired the module, and the dag was retried and loaded successfully.

https://github.com/user-attachments/assets/1e18ff25-79c2-4ed5-a630-8bdd7c2c86c7

Lzzz666 avatar Jun 27 '25 11:06 Lzzz666

I’m going to merge this, let’s see what happens.

uranusjr avatar Jul 02 '25 06:07 uranusjr

Backport failed to create: v3-0-test. View the failure log Run details

Status Branch Result
v3-0-test Commit Link

You can attempt to backport this manually by running:

cherry_picker d3bddfd v3-0-test

This should apply the commit to the v3-0-test branch and leave the commit in conflict state marking the files that need manual conflict resolution.

After you have resolved the conflicts, you can continue the backport process by running:

cherry_picker --continue

github-actions[bot] avatar Jul 02 '25 07:07 github-actions[bot]

@Lzzz666 Can you help with backporting this manually?

uranusjr avatar Jul 02 '25 07:07 uranusjr

Manual backport done in #52698

Lzzz666 avatar Jul 04 '25 04:07 Lzzz666