refactor _add_files_to_queue
doing in operation on list, when having large number of files is not optimal. So, we'll just create a set of the self._file_queue and then use it.
I did a test(Just wanted to give a try to see the gain)
import timeit
from collections import deque
from pathlib import Path
from airflow.dag_processing.manager import DagFileInfo
queue = deque(DagFileInfo(Path(f"dag_{i}.py"), f"bundle_{i%10}", f"{i%5}.0") for i in range(400))
files = [DagFileInfo(Path(f"dag_{i}.py"), f"bundle_{i%10}", f"{i%5}.0") for i in range(2000)]
def test_current(): return [f for f in files if f not in queue]
def test_optimized(): queue_set = set(queue); return [f for f in files if f not in queue_set]
iterations = 1000
current = timeit.timeit(test_current, number=iterations)
optimized = timeit.timeit(test_optimized, number=iterations)
print(f"Current: {current:.3f}s ({current/iterations*1000:.2f}ms/call)")
print(f"Optimized: {optimized:.3f}s ({optimized/iterations*1000:.2f}ms/call)")
print(f"Speedup: {current/optimized:.1f}x faster")
Current: 97.487s (97.49ms/call)
Optimized: 0.281s (0.28ms/call)
Speedup: 346.6x faster
Assumed, there can be about 400 files in queue.. while parsing about 2k files in total across dag-bundles. Intentionally choose high number of dags to see perf. gain.
I'm keeping this in draft as I'm just checking how large can queue can be.. when parsing 2k files.
EDIT:
i ran an actual test to see how large queue can get. Well, it can go max up to the no. of dags. But, if parsing each dag takes time, queue is always going to be a number close to no. of dags.
Making this draft. Because, the method seems to be called only during startup. reviewing it again
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.