airflow
airflow copied to clipboard
Add watchdog for immediately processing changes in the DAGs folder
This PR implements a filesystem listener to immediately process changes in the DAGs folder.
Still some TODOs but creating the PR for testing/feedback.
closes: https://github.com/apache/airflow/discussions/27208
IRL !
Nice
If one uses git-sync to sync their dags, will this trigger for all every time git-sync triggers regardless of whether the contents change?
Edit: adding that git-sync works by writing repository folder to a new location and rewriting symbolic link to the folder.
So what's the "solution" for dynamic? Parse those on schedule?
Great idea! It would be very helpful.
So what's the "solution" for dynamic? Parse those on schedule?
If I understood correctly, this is not meant to replace regular reparsing, but just make it more responsive in some cases. And improve greatly new dags discovery.
Dynamic dags are not the only issue, dags spread over several files are too. If you have a utils.py
that several dags depend on, and you modify that file, reparsing is necessary to propagate changes (because the dags files will appear unchanged).
This is great @BasPH. In the initial release, can we hide this behind a feature flag? It would allow you to make breaking changes in later releases in case something is broken as well as prevent any surprises for the users.
So what's the "solution" for dynamic? Parse those on schedule?
This PR currently adds watchdog to live alongside the current DAG parser implementation (i.e. periodic reparsing). Introducing watchdog will improve the life of 99% of the Airflow users who create static DAGs. For the 1% of Airflow users who create dynamic DAGs which requires reparsing periodically, they still have the current DAG parser.
We could inform users to bump min_file_process_interval
to a very high number in case they're not using any dynamic DAGs, so that reparsing only occurs incidentally. And I think in the more distant future we could introduce a "reparse" toggle of some sorts on DAG
to indicate per DAG whether or not it should be reparsed periodically?
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.
Bump
I guess resolving conflicts and rebasing would be good
Yes, need to find time to sit down and finish this
@BasPH Just curious where we are - do you think you'll have time to get this in for 2.9?
+1, waiting for this nice feature :pray:
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.
@BasPH You presented this last summit, was eagerly waiting on this - were you dragged to other priorities or missing support making this to main? Or technical limitations? Would be great to have this!
Hi @jscheffl, unfortunately life got in the way of investing time in OSS work the last year.
The essentials are there but I was working on integrating this code nicely with Airflow some time ago. My idea was to make the DAG processor a configurable class, so that the user could choose between the "current" DAG processor, or this "new" DAG processor using watchdog for handling DAG code changes. This turned out to be troublesome because the DAG processor doesn't just process DAGs but includes other responsibilities such as handling task & SLA callbacks, the DagFileProcessorManager
is actually a DagFileProcessorManagerAndTaskAndSlaCallbackHandler
... I assume this was the result of "hacking" the DAG processor to perform recurring system operations because it's a recurring process, but architecturally I don't think the DAG processor should handle callbacks.
I see two options at the moment:
- Extract all the non-DAG-processing logic (handling callbacks, SLAs, etc.) into a new dedicated component so that the DAG processor only processes DAG files. This would allow us to deprecate/remove current settings related to the DAG processor such as
dag_dir_list_interval
andmin_file_process_interval
. I think this is architecturally better but requires more work. - Keep the current DAG processor and add watchdog alongside the current implementation. The result is that the current settings such as
dag_dir_list_interval
andmin_file_process_interval
are kept, but this requires less work.
Curious about your thoughts.
My 2 cents:
For AF 3: Would move the callbacks and stuff to the workers or separate components
For Option (2): How much less work is it? :)
For AF 3: Would move the callbacks and stuff to the workers or separate components
Agree.
For Option (2): How much less work is it? :)
That will be a lot less work since we don't have to invent a way to handle callbacks etc. separately.