flink icon indicating copy to clipboard operation
flink copied to clipboard

[Improvement] Storage Partition Join

Open tharvey5 opened this issue 5 months ago • 2 comments

What is the purpose of the change

Currently, Apache Flink's does not support storage partition join, which can lead to unnecessary data shuffles in batch mode. This PR implements a basic version to support that.

Brief change log

This pull request introduces a new optimizer configuration option table.optimizer.storage-partition-join-enabled and query planer changes to detect when both sides of a join are partitioned by the join keys and compatible, allowing it to apply a storage partition join strategy. This avoids unnecessary shuffles by leveraging the source's partitioning.

Key changes include:

  • Addition of the SupportsPartitioning interface for table sources to expose partitioning information.
  • Implementation of KeyGroupedPartitioning to represent partitioning schemes.
  • Integration of partitioning awareness in the batch physical sort-merge join rule to conditionally use the storage partition join when enabled and applicable.
  • [for Testing] Serialization and deserialization utilities for partitioning metadata.
  • [for Testing] Extension of the test values table factory to support partitioning.
  • Comprehensive unit and integration tests verifying the new join strategy and its configuration.

This enhancement is currently applicable only in batch mode and requires the source tables to be partitioned by the join keys.

Verifying this change

  • Added test util for serialization and deserialization of partitioning metadata, so we can create a test table with a KeyGroupPartition.
  • Added integration tests (TestStoragePartitionJoin) that verify the optimizer plan changes when the storage partition join is enabled or disabled.
  • Verified that existing tests pass and that the new join strategy is correctly applied only when the configuration is enabled and partitioning is compatible.
  • Manual verification of execution plans to confirm the absence of unnecessary shuffles when storage partition join is enabled.
  • Verified unit test in table-planner module result is the same as before the change: [ERROR] Tests run: 8671, Failures: 4, Errors: 0, Skipped: 1 We tested that before the change test failures is also 4:

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

tharvey5 avatar Jun 25 '25 21:06 tharvey5

CI report:

  • 8b3932ecd6295c9ee7b96bebba17a735efa06770 Azure: FAILURE
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Jun 25 '25 21:06 flinkbot

Hi @tharvey5 , I know this is a draft, so this may already be in hand.

  • I notice the change is against Flink 1.18. We should do new changes against master then back port. Currently the back ports are to v2 1.20 and 1.19.
  • see our process https://flink.apache.org/how-to-contribute/contribute-code/ where it refers to getting consensus on the dev list , raising a Jira . The Jira number should be in the PR title for all changes apart from hotfixes.

davidradl avatar Jun 27 '25 13:06 davidradl