flink
flink copied to clipboard
[FLINK-33694][gs-fs-hadoop] Support overriding GCS root URL
What is the purpose of the change
This change configures the the GCS client used by the GSFileSystem's RecoverableWriter codepath to use the same GCS root URL as used by Hadoop.
Without this change, the GSFileSystem behaves differently depending on context if this config option is set—for instance, FileSource will use the overridden URL while FileSink will not.
Brief change log
- Update GSFileSystemFactory/GSFileSystemOptions to look for the gs.storage.root.url Flink config option, overriding the GCS client's Host option if it is set
Verifying this change
- Added one test to ensure the GCS SDK
Storage
client'sHost
option is overridden when thegs.storage.root.url
Flink config option is set - Built the plugin and used it with an otherwise-vanilla Flink deployment and successfully used FileSink to write to an instance of fake-gcs-server
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
@Public(Evolving)
: no - The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: no
- The S3 file system connector: no
Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable (previously-documented behavior for overriding the GCS root URL was not actually supported, but will be with this change)
Once this is accepted, I am ready with backport branches for 1.18 and 1.17.
CI report:
- c60697980c15b0037d4a8dd91167e0a4aee3e1c8 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:-
@flinkbot run azure
re-run the last Azure build
@flinkbot run azure
@flinkbot run azure
I considered a slightly different approach, delegating the lookup of the config option to the GCS Hadoop connector itself: https://github.com/apache/flink/compare/master...patricklucas:flink:FLINK-33694_gs_fs_host_v2
If I get some feedback about which of these options is preferred, I can update the PR accordingly.
Edit: I decided to go with this alternate approach since it will guarantee the option stays in sync with both the Hadoop connector and GCS client.
@XComp before this becomes a dead PR, do you know who might be a good candidate to ask for a review?
@afedulov do you have capacity to review this PR?
@patricklucas My understanding is that one would typically want to override fs.gs.storage.root.url
only in very specific scenarios, such as the one you mentioned with fake-gcs-server
. I would like to understand if this change could lead to unexpected consequences for Flink GCS users. It appears that:
- It is generally very unlikely that users have the
fs.gs.storage.root.url
parameter set - If it is set, they probably expect both the GCS source and GCS sink to use the URL
Is this assessment accurate?
@afedulov it is quite popular to use alternate endpoint URLs for blob storage services with S3, less so for GCS. I would actually expect it to be more popular, as testing jobs that interact with blob storage without actually having to read/write over the internet is desirable, but it's clearly not so common with Flink else others would have hit this.
But yes, the main issue is the inconsistent behavior: Flink relies on Hadoop's filesystem implementations in many cases, and they can be configured using the Hadoop configuration options. However, Flink has chosen to use its own custom implementations in certain cases but has not ensured they can be configured the same way. A user would reasonably expect they can use the Hadoop configuration options to configure this Flink FS impl (which has "Hadoop" in the name), but in fact nearly no configuration options for the write codepath are exposed.
I do not think there is much room for unexpected behavior here. The surface area of the change is quite small and follows the same general approach for translating a Hadoop config option specific to this filesystem implementation into its native SDK equivalent as used for configuring credentials.
But yes, the main issue is the inconsistent behavior
The purpose is clear and I think it is a good idea to improve it. My only concern is a scenario like this:
- A user has a pipeline that reads data from some local GCS implementation and writes it into the actual cloud GCS(root GCS url configured via Hadoop). This works because the sink currently ignores this setting and defaults to the Google URL
- Merging the proposed change unexpectedly breaks the pipeline because it loops data back into the local store instead of Google GCS. Can we be confident that such scenario does not exist in the wild?
My suspicion is that there's an extremely small chance of that but I have no way to quantify it.
In our case it was actually the other way around: we set this option and were reading and writing to GCS, however we were writing in a custom sink that used the GCS SDK directly, so everything worked as expected. It was only when we migrated to using the normal file sink that this issue arose.
Does the Flink project have guidelines to approaching fixing this sort of issue, where users may depend on a definitely-broken behavior? If we cannot fix it directly (before Flink 2.0, I assume), do you have a suggestion for another workaround—perhaps just a new config key altogether?
Unfortunately there are no definitive guidelines for such edge cases in Flink. That said, I believe the current behavior can be justifiably considered a bug that your PR fixes. The risk that someone might currently rely on the broken behavior and gets affected is there, but the chances are pretty low. LGTM, thanks for the contribution, Patrick. @XComp could you please merge? (I am still waiting for my ICLA to get stamped)
Sure. Could you provide backport PRs for 1.18 and 1.17? Then I can merge all the necessary PRs at once.
@XComp sure, coming up shortly.
Thanks for the review, @afedulov.