airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Add watchdog for immediately processing changes in the DAGs folder

Open BasPH opened this issue 1 year ago • 14 comments

This PR implements a filesystem listener to immediately process changes in the DAGs folder.

Still some TODOs but creating the PR for testing/feedback.

closes: https://github.com/apache/airflow/discussions/27208

BasPH avatar Sep 19 '23 18:09 BasPH

20230919_145315.jpg

IRL !

cjcjameson avatar Sep 19 '23 18:09 cjcjameson

Nice

dstandish avatar Sep 19 '23 18:09 dstandish

If one uses git-sync to sync their dags, will this trigger for all every time git-sync triggers regardless of whether the contents change?

Edit: adding that git-sync works by writing repository folder to a new location and rewriting symbolic link to the folder.

ssantichaivekin-plaid avatar Sep 19 '23 18:09 ssantichaivekin-plaid

So what's the "solution" for dynamic? Parse those on schedule?

dstandish avatar Sep 19 '23 18:09 dstandish

Great idea! It would be very helpful.

hussein-awala avatar Sep 19 '23 19:09 hussein-awala

So what's the "solution" for dynamic? Parse those on schedule?

If I understood correctly, this is not meant to replace regular reparsing, but just make it more responsive in some cases. And improve greatly new dags discovery.

Dynamic dags are not the only issue, dags spread over several files are too. If you have a utils.py that several dags depend on, and you modify that file, reparsing is necessary to propagate changes (because the dags files will appear unchanged).

vandonr-amz avatar Sep 22 '23 22:09 vandonr-amz

This is great @BasPH. In the initial release, can we hide this behind a feature flag? It would allow you to make breaking changes in later releases in case something is broken as well as prevent any surprises for the users.

shubham22 avatar Sep 23 '23 00:09 shubham22

So what's the "solution" for dynamic? Parse those on schedule?

This PR currently adds watchdog to live alongside the current DAG parser implementation (i.e. periodic reparsing). Introducing watchdog will improve the life of 99% of the Airflow users who create static DAGs. For the 1% of Airflow users who create dynamic DAGs which requires reparsing periodically, they still have the current DAG parser.

We could inform users to bump min_file_process_interval to a very high number in case they're not using any dynamic DAGs, so that reparsing only occurs incidentally. And I think in the more distant future we could introduce a "reparse" toggle of some sorts on DAG to indicate per DAG whether or not it should be reparsed periodically?

BasPH avatar Oct 09 '23 07:10 BasPH

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Dec 15 '23 00:12 github-actions[bot]

Bump

BasPH avatar Dec 16 '23 06:12 BasPH

I guess resolving conflicts and rebasing would be good

potiuk avatar Dec 16 '23 21:12 potiuk

Yes, need to find time to sit down and finish this

BasPH avatar Dec 18 '23 10:12 BasPH

@BasPH Just curious where we are - do you think you'll have time to get this in for 2.9?

cmarteepants avatar Feb 01 '24 15:02 cmarteepants

+1, waiting for this nice feature :pray:

Michalosu avatar Feb 21 '24 08:02 Michalosu

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Apr 08 '24 00:04 github-actions[bot]

@BasPH You presented this last summit, was eagerly waiting on this - were you dragged to other priorities or missing support making this to main? Or technical limitations? Would be great to have this!

jscheffl avatar May 18 '24 10:05 jscheffl

Hi @jscheffl, unfortunately life got in the way of investing time in OSS work the last year.

The essentials are there but I was working on integrating this code nicely with Airflow some time ago. My idea was to make the DAG processor a configurable class, so that the user could choose between the "current" DAG processor, or this "new" DAG processor using watchdog for handling DAG code changes. This turned out to be troublesome because the DAG processor doesn't just process DAGs but includes other responsibilities such as handling task & SLA callbacks, the DagFileProcessorManager is actually a DagFileProcessorManagerAndTaskAndSlaCallbackHandler... I assume this was the result of "hacking" the DAG processor to perform recurring system operations because it's a recurring process, but architecturally I don't think the DAG processor should handle callbacks.

I see two options at the moment:

  1. Extract all the non-DAG-processing logic (handling callbacks, SLAs, etc.) into a new dedicated component so that the DAG processor only processes DAG files. This would allow us to deprecate/remove current settings related to the DAG processor such as dag_dir_list_interval and min_file_process_interval. I think this is architecturally better but requires more work.
  2. Keep the current DAG processor and add watchdog alongside the current implementation. The result is that the current settings such as dag_dir_list_interval and min_file_process_interval are kept, but this requires less work.

Curious about your thoughts.

BasPH avatar May 20 '24 11:05 BasPH

My 2 cents:

For AF 3: Would move the callbacks and stuff to the workers or separate components

For Option (2): How much less work is it? :)

kaxil avatar May 21 '24 14:05 kaxil

For AF 3: Would move the callbacks and stuff to the workers or separate components

Agree.

potiuk avatar May 21 '24 15:05 potiuk

For Option (2): How much less work is it? :)

That will be a lot less work since we don't have to invent a way to handle callbacks etc. separately.

BasPH avatar Jun 17 '24 09:06 BasPH