[Improvement] Storage Partition Join
What is the purpose of the change
Currently, Apache Flink's does not support storage partition join, which can lead to unnecessary data shuffles in batch mode. This PR implements a basic version to support that.
Brief change log
This pull request introduces a new optimizer configuration option table.optimizer.storage-partition-join-enabled and query planer changes to detect when both sides of a join are partitioned by the join keys and compatible, allowing it to apply a storage partition join strategy. This avoids unnecessary shuffles by leveraging the source's partitioning.
Key changes include:
- Addition of the SupportsPartitioning interface for table sources to expose partitioning information.
- Implementation of KeyGroupedPartitioning to represent partitioning schemes.
- Integration of partitioning awareness in the batch physical sort-merge join rule to conditionally use the storage partition join when enabled and applicable.
- [for Testing] Serialization and deserialization utilities for partitioning metadata.
- [for Testing] Extension of the test values table factory to support partitioning.
- Comprehensive unit and integration tests verifying the new join strategy and its configuration.
This enhancement is currently applicable only in batch mode and requires the source tables to be partitioned by the join keys.
Verifying this change
- Added test util for serialization and deserialization of partitioning metadata, so we can create a test table with a KeyGroupPartition.
- Added integration tests (TestStoragePartitionJoin) that verify the optimizer plan changes when the storage partition join is enabled or disabled.
- Verified that existing tests pass and that the new join strategy is correctly applied only when the configuration is enabled and partitioning is compatible.
- Manual verification of execution plans to confirm the absence of unnecessary shuffles when storage partition join is enabled.
- Verified unit test in table-planner module result is the same as before the change:
[ERROR] Tests run: 8671, Failures: 4, Errors: 0, Skipped: 1We tested that before the change test failures is also 4:
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving): (yes / no) - The serializers: (yes / no / don't know)
- The runtime per-record code paths (performance sensitive): (yes / no / don't know)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
- The S3 file system connector: (yes / no / don't know)
Documentation
- Does this pull request introduce a new feature? (yes / no)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
CI report:
- 8b3932ecd6295c9ee7b96bebba17a735efa06770 Azure: FAILURE
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
Hi @tharvey5 , I know this is a draft, so this may already be in hand.
- I notice the change is against Flink 1.18. We should do new changes against master then back port. Currently the back ports are to v2 1.20 and 1.19.
- see our process https://flink.apache.org/how-to-contribute/contribute-code/ where it refers to getting consensus on the dev list , raising a Jira . The Jira number should be in the PR title for all changes apart from hotfixes.