flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-33694][gs-fs-hadoop] Support overriding GCS root URL

Open patricklucas opened this issue 1 year ago • 12 comments

What is the purpose of the change

This change configures the the GCS client used by the GSFileSystem's RecoverableWriter codepath to use the same GCS root URL as used by Hadoop.

Without this change, the GSFileSystem behaves differently depending on context if this config option is set—for instance, FileSource will use the overridden URL while FileSink will not.

Brief change log

  • Update GSFileSystemFactory/GSFileSystemOptions to look for the gs.storage.root.url Flink config option, overriding the GCS client's Host option if it is set

Verifying this change

  • Added one test to ensure the GCS SDK Storage client's Host option is overridden when the gs.storage.root.url Flink config option is set
  • Built the plugin and used it with an otherwise-vanilla Flink deployment and successfully used FileSink to write to an instance of fake-gcs-server

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable (previously-documented behavior for overriding the GCS root URL was not actually supported, but will be with this change)

patricklucas avatar Nov 30 '23 09:11 patricklucas

Once this is accepted, I am ready with backport branches for 1.18 and 1.17.

patricklucas avatar Nov 30 '23 09:11 patricklucas

CI report:

  • c60697980c15b0037d4a8dd91167e0a4aee3e1c8 Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Nov 30 '23 09:11 flinkbot

@flinkbot run azure

patricklucas avatar Nov 30 '23 15:11 patricklucas

@flinkbot run azure

patricklucas avatar Dec 01 '23 09:12 patricklucas

I considered a slightly different approach, delegating the lookup of the config option to the GCS Hadoop connector itself: https://github.com/apache/flink/compare/master...patricklucas:flink:FLINK-33694_gs_fs_host_v2

If I get some feedback about which of these options is preferred, I can update the PR accordingly.

Edit: I decided to go with this alternate approach since it will guarantee the option stays in sync with both the Hadoop connector and GCS client.

patricklucas avatar Dec 03 '23 15:12 patricklucas

@XComp before this becomes a dead PR, do you know who might be a good candidate to ask for a review?

patricklucas avatar Dec 12 '23 09:12 patricklucas

@afedulov do you have capacity to review this PR?

XComp avatar Dec 12 '23 11:12 XComp

@patricklucas My understanding is that one would typically want to override fs.gs.storage.root.url only in very specific scenarios, such as the one you mentioned with fake-gcs-server. I would like to understand if this change could lead to unexpected consequences for Flink GCS users. It appears that:

  • It is generally very unlikely that users have the fs.gs.storage.root.url parameter set
  • If it is set, they probably expect both the GCS source and GCS sink to use the URL

Is this assessment accurate?

afedulov avatar Dec 12 '23 15:12 afedulov

@afedulov it is quite popular to use alternate endpoint URLs for blob storage services with S3, less so for GCS. I would actually expect it to be more popular, as testing jobs that interact with blob storage without actually having to read/write over the internet is desirable, but it's clearly not so common with Flink else others would have hit this.

But yes, the main issue is the inconsistent behavior: Flink relies on Hadoop's filesystem implementations in many cases, and they can be configured using the Hadoop configuration options. However, Flink has chosen to use its own custom implementations in certain cases but has not ensured they can be configured the same way. A user would reasonably expect they can use the Hadoop configuration options to configure this Flink FS impl (which has "Hadoop" in the name), but in fact nearly no configuration options for the write codepath are exposed.

patricklucas avatar Jan 02 '24 12:01 patricklucas

I do not think there is much room for unexpected behavior here. The surface area of the change is quite small and follows the same general approach for translating a Hadoop config option specific to this filesystem implementation into its native SDK equivalent as used for configuring credentials.

patricklucas avatar Jan 02 '24 15:01 patricklucas

But yes, the main issue is the inconsistent behavior

The purpose is clear and I think it is a good idea to improve it. My only concern is a scenario like this:

  • A user has a pipeline that reads data from some local GCS implementation and writes it into the actual cloud GCS(root GCS url configured via Hadoop). This works because the sink currently ignores this setting and defaults to the Google URL
  • Merging the proposed change unexpectedly breaks the pipeline because it loops data back into the local store instead of Google GCS. Can we be confident that such scenario does not exist in the wild?

afedulov avatar Jan 09 '24 12:01 afedulov

My suspicion is that there's an extremely small chance of that but I have no way to quantify it.

In our case it was actually the other way around: we set this option and were reading and writing to GCS, however we were writing in a custom sink that used the GCS SDK directly, so everything worked as expected. It was only when we migrated to using the normal file sink that this issue arose.

Does the Flink project have guidelines to approaching fixing this sort of issue, where users may depend on a definitely-broken behavior? If we cannot fix it directly (before Flink 2.0, I assume), do you have a suggestion for another workaround—perhaps just a new config key altogether?

patricklucas avatar Jan 11 '24 15:01 patricklucas

Unfortunately there are no definitive guidelines for such edge cases in Flink. That said, I believe the current behavior can be justifiably considered a bug that your PR fixes. The risk that someone might currently rely on the broken behavior and gets affected is there, but the chances are pretty low. LGTM, thanks for the contribution, Patrick. @XComp could you please merge? (I am still waiting for my ICLA to get stamped)

afedulov avatar Jan 16 '24 18:01 afedulov

Sure. Could you provide backport PRs for 1.18 and 1.17? Then I can merge all the necessary PRs at once.

XComp avatar Jan 17 '24 17:01 XComp

@XComp sure, coming up shortly.

Thanks for the review, @afedulov.

patricklucas avatar Jan 17 '24 17:01 patricklucas