airflow
airflow copied to clipboard
Add support for S3 dag bundle
implements S3 dag bundle, with this end user could set an S3 bucket and prefix(sub folder) as a dag bundle source.
related wiki AIP-66: DAG Bundles & Parsing
related project: AIP-66: DAG Bundles & Parsing
closes #9555
closes #8657
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst) Here are some useful points:
- Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
- In case of a new feature add useful documentation (in docstrings or in
docs/directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it. - Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
- Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
- Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
- Be sure to read the Airflow Coding style.
- Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits. Apache Airflow is a community-driven project and together we are making it better 🚀. In case of doubts contact the developers at: Mailing List: [email protected] Slack: https://s.apache.org/airflow-slack
moved it to provider package. its ready for review. any chance to trigger the test workflows?
moved it to provider package. its ready for review. any chance to trigger the test workflows?
Triggered workflows again, will review soon :+1:
code related tests are passing now. remaining test is related to airflow version. the BaseDagBundle code available only in airflow 3 but the test is running with airflow 2.9.x . any idea how to fix it?
@o-nikolas @jedcunningham its ready for review.
Looks reasonable to me, but I'm no Bundles specialist. I talked to Jed before he left on leave and he mentioned to loop in @dstandish on Bundles stuff. Does this look reasonable to you as well Daniel?
@dstandish @eladkal @ephraimbuddy is it possible to get this in, if overall it looks good. and followup with improvements.
is it possible to run following command with podman? its asking for docker
breeze static-checks --all-files --show-diff-on-failure --color always --initialize-environment
is it possible to run following command with podman? its asking for docker
breeze static-checks --all-files --show-diff-on-failure --color always --initialize-environment
Did you try https://podman-desktop.io/docs/migrating-from-docker/managing-docker-compatibility
Don’t use caplog in general. I think it’s not too difficult to rewrite the tests.
Would really like to see the pr merged 😍
The branch is 600 commits behind. I rebased it. lets see what CI says.
I would love to see this merged as well. @ismailsimsek the use of caplog needs to be addressed (i.e. removed). Do you need any help with this?
I would love to see this merged as well. @ismailsimsek the use of caplog needs to be addressed (i.e. removed). Do you need any help with this?
That would be a great help, thank you! I don't have much time to work on this recently, so I'd really appreciate it.
@ismailsimsek I made some changes to use mocking of the log attribute instead of caplog usage (which was breaking the tests). It's difficult to apply changes to a branch I don't own. I added three commits with the fixes (and some others stuff that's happened in main since). I don't think it applied cleanly. But you can look at the commits and update your branch accordingly if not!
I have updated my local branch and ran the s3 tests successfully, everything looks good to me. Thank you very much @o-nikolas appreciate it.
Looks like tests are green now (I had to retry one step, but it's passing now). Merging!
Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.
Hi all,
Would it be a good idea to introduce a RemoteDagBundle interface (or perhaps extend BaseDagBundle or add a common utility in the bundles.base module) to support versioning for remote Dag bundles like S3DagBundle?
We could support bundle versioning using one of the following approaches (potentially controlled by an enable_versioning parameter in RemoteDagBundle):
- Content-based hash: Calculate a hash of the bundle by hashing each Dags file and incorporating the directory structure, then return a final hash value as the version.
- Timestamp-based version: Use the last modified timestamp of the bundle. Most cloud providers support a
LastModifiedorupdatedmetadata field that we can leverage.
If this sounds reasonable, I will open a new issue to track this feature and raise PR for it.
Hi all,
Would it be a good idea to introduce a
RemoteDagBundleinterface (or perhaps extendBaseDagBundleor add a common utility in thebundles.basemodule) to support versioning for remote Dag bundles likeS3DagBundle?We could support bundle versioning using one of the following approaches (potentially controlled by an
enable_versioningparameter inRemoteDagBundle):1. **Content-based hash**: Calculate a hash of the bundle by hashing each Dags file and incorporating the directory structure, then return a final hash value as the version. 2. **Timestamp-based version**: Use the last modified timestamp of the bundle. Most cloud providers support a `LastModified` or `updated` metadata field that we can leverage.If this sounds reasonable, I will open a new issue to track this feature and raise PR for it.
CC @jedcunningham
I was talking to Jed about something very similar last week. I don't even think the interface needs to enforce the "how" for the versioning, that can be delegated to each Bundle provider. But I agree that the interface should support this, especially from the persistence side of things. IMHO the interface should expect a string payload back from the bundle provider which is the representation of the dag bundle version (which can be whatever mechanism the provider wants/can use to serialize the state of the dags at that time). I'd also vote that the interface creates the version id for that, otherwise each provider will create their own semantics and shape of the id which is going to create inconsistency.
I was talking to Jed about something very similar last week. I don't even think the interface needs to enforce the "how" for the versioning, that can be delegated to each Bundle provider.
Cool! Glad to hear we’re on the same page. I don’t have a strong opinion on this, my main concern is making sure we align on the overall changes before moving forward.
I agree, this approach might be simpler.
IMHO the interface should expect a string payload back from the bundle provider which is the representation of the dag bundle version (which can be whatever mechanism the provider wants/can use to serialize the state of the dags at that time).
I’ve considered this scenario, and I agree it should be a mandatory requirement if any provider supports versioning.
However, it seems challenging to get a specific snapshot version at the bucket + subpath level in most object stores. As far as I know, current cloud object stores only support versioning at the object level, not for a combination of bucket + subpath.
One possible solution is: The user could provide a special path for storing a metadata file (maybe in JSON or YAML format), which would track the current bundle version ID along with the version IDs of the relevant objects (Dag files) and their prefixes. I’m not sure if this is the best approach, but I’d love to hear your thoughts.
The solution will similar to metadata file in Open Table Format (e.g. Apache Iceberg, Delta Lake) and metadata file for supporting bundle versioning might be like:
- bundle_version: <bundle version>
dag_files:
- object_version_id: <object version id from object store>
prefix: /path/dag/file
I'd also vote that the interface creates the version id for that, otherwise each provider will create their own semantics and shape of the id which is going to create inconsistency.
Thanks for agreeing that the interface should create the version ID. I’ll continue to gather more ideas and suggestions on this.
I was talking to Jed about something very similar last week. I don't even think the interface needs to enforce the "how" for the versioning, that can be delegated to each Bundle provider.
Cool! Glad to hear we’re on the same page. I don’t have a strong opinion on this, my main concern is making sure we align on the overall changes before moving forward.
Cool, I'm glad as well! Let's definitely wait for @jedcunningham to weigh in as well.
I agree, this approach might be simpler.
IMHO the interface should expect a string payload back from the bundle provider which is the representation of the dag bundle version (which can be whatever mechanism the provider wants/can use to serialize the state of the dags at that time).
I’ve considered this scenario, and I agree it should be a mandatory requirement if any provider supports versioning.
However, it seems challenging to get a specific snapshot version at the bucket + subpath level in most object stores. As far as I know, current cloud object stores only support versioning at the object level, not for a combination of bucket + subpath.
One possible solution is: The user could provide a special path for storing a metadata file (maybe in JSON or YAML format), which would track the current bundle version ID along with the version IDs of the relevant objects (Dag files) and their prefixes. I’m not sure if this is the best approach, but I’d love to hear your thoughts.
The solution will similar to metadata file in Open Table Format (e.g. Apache Iceberg, Delta Lake) and metadata file for supporting bundle versioning might be like:
- bundle_version: <bundle version> dag_files: - object_version_id: <object version id from object store> prefix: /path/dag/file
Yupp, this is also basically what I already proposed in my email on the dev list for the S3 Bundle :smiley: I'll quote this sentence for a gist:
The proposed solution involves building a snapshot of the S3 bucket at the time each Bundle version is created, noting the version of all the objects in the bucket (using S3’s native bucket versioning feature) and creating a manifest to store those versions and then giving that whole manifest itself some unique id/version/uuid.
And I think that payload, and ones for other providers (whatever they decide/need to do for their storage), are exactly what can just be serialized into a string payload and returned through the Bundle interface.
I'd also vote that the interface creates the version id for that, otherwise each provider will create their own semantics and shape of the id which is going to create inconsistency.
Thanks for agreeing that the interface should create the version ID. I’ll continue to gather more ideas and suggestions on this.
Sounds good!
And I think that payload, and ones for other providers (whatever they decide/need to do for their storage), are exactly what can just be serialized into a string payload and returned through the Bundle interface.
Yep. As commented on the devlist. Making it into a generic solution, without special tables for S3 - but leaving the serialization / deserialization to implementation - is a very elegant and nice solution :)
My 2c, giving bundles somewhere to store this extra stuff makes sense. I'd rather we let the bundles own id creation though - if there is one that is meaningful already, we should use that.
IMHO the interface should expect a string payload back from the bundle provider which is the representation of the dag bundle version (which can be whatever mechanism the provider wants/can use to serialize the state of the dags at that time).
This is what we have today actually, it's just very limited in size, particularly when you consider the "I need to store the version of every object in a bucket" type situation. So, providing somewhere to store it makes sense, but we don't have a natural place for it today (not hard to add though).
@jedcunningham
My 2c, giving bundles somewhere to store this extra stuff makes sense.
:heart:
I'd rather we let the bundles own id creation though - if there is one that is meaningful already, we should use that.
I think a consistent version id format is better than a wild west of IDs from each provider, but I'm happy to disagree and commit on this one. I see where you're coming from on the id sometimes being useful (but I really only see git as the example of that).
This is what we have today actually, it's just very limited in size, particularly when you consider the "I need to store the version of every object in a bucket" type situation. So, providing somewhere to store it makes sense, but we don't have a natural place for it today (not hard to add though).
Yeah, today the version field is both a unique ID and the data actually needed to recreate the bundle version represented by that id (Git, again, is the hallmark example of this). But the S3 case we'd want a unique ID that is more of a UUID Primary Key which is then used to fetch the manifest needed to recreate the bundle version on the filesystem. Since the manifest will be large(ish) and we don't want it displayed in the UI or any other user accessible location.
@jason810496
I chatted with @jedcunningham last week and we're both strapped for time at the moment. Did you want to take a stab at implementing what we've been discussing here?
@jason810496
I chatted with @jedcunningham last week and we're both strapped for time at the moment. Did you want to take a stab at implementing what we've been discussing here?
Sure, I would like to help finalize the design and handle the implementation as well.
However, my bandwidth is limited at the moment as I am resolving an issue with the CloudWatch remote logging handler and preparing for my OSS conference presentation next week. I will only be able to start working on the design and implementation after August 11th.
I have already added this issue to my backlog. If anyone else has more availability before August 11th, please feel free to take it over.
@jason810496
I think that will be more than fine for a timeline :)