[FLINK-37889] [table-planner] Add JoinToMultiJoinRule
What is the purpose of the change
Introduce FlinkStreamJoinToMultiJoinRule
It also covers FLINK-37890
Brief change log
- Added FlinkStreamJoinToMultiJoinRule
- Added FlinkMultiJoinNode
Verifying this change
- Added rel plan tests
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with @Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
Hey, @gustavodemorais ! Sorry for the waiting, I was unexpectedly extremely busy last week. Please take a look at this. There will be plenty of refactoring but this version works for now with commonJoinKey checking and right joins enabled.
CI report:
- 5b2fcd843e7581eebfac503cdbe3c02c5b456163 Azure: FAILURE
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
Hey, @gustavodemorais ! Sorry for the waiting, I was unexpectedly extremely busy last week. Please take a look at this. There will be plenty of refactoring but this version works for now with commonJoinKey checking and right joins enabled.
Hey @SteveStevenpoor, I understand, sorry for the rush because of the deadline. We're far with https://github.com/apache/flink/pull/26687 and will probably merge it today or tomorrow.
If you have the capacity, you could open a PR against that branch to cover mostly FLINK-37890. It'll be automatically retargeted to master as soon as we merge https://github.com/apache/flink/pull/26687. Some observations from reviewing this PR:
- No commonJoinKey in general does not mean we don't use MultiJoin, it just means we have to break it into multiple MultiJoins. It's in the description of FLINK-37890. testInnerJoinChainNoCommonJoinKey e.g. would have two multiple joins with two inputs each. We want a test to check if we can have for example 2, 3 MultiJoins combined.
- We want to use calcilte's MultiJoin for now in the rule to be able to combined it with the native project and merge rule for now, as we did in the other PR. commonJoinKeys doesn't have to be stored. We just need to check if the constraint is respected here.
- The rule shouldn't cover right joins since we got the other for that.
- We'd want e2e table programs to test it as shown in the other PR.
If you don't have a lot of time now, that's no problem and I'll get to this probably tomorrow. After the deadline, we can discuss you picking other items for the multi way join without the rush of the deadline. We still have some essential jira tickets to get the feature from an experimental state to production ready here.
If you have the capacity, you could open a PR against that branch to cover mostly FLINK-37890. It'll be automatically retargeted to master as soon as we merge #26687. Some observations from reviewing this PR:
I'll do it!
No commonJoinKey in general does not mean we don't use MultiJoin, it just means we have to break it into multiple MultiJoins. It's in the description of FLINK-37890. testInnerJoinChainNoCommonJoinKey e.g. would have two multiple joins with two inputs each. We want a test to check if we can have for example 2, 3 MultiJoins combined.
Say we have A JOIN B JOIN C JOIN D and A, B, C have commonJoinKey but D doesnt. My code will give MJ(A, B, C) JOIN D. But what we want is actually MJ(MJ(A, B, C), D). Am I correct?
We want to use calcilte's MultiJoin for now in the rule to be able to combined it with the native project and merge rule for now, as we did in the other PR. commonJoinKeys doesn't have to be stored. We just need to check if the constraint is respected here.
I needed it to make check for commonJoinKey more simple. I can use calcite's mj instead.
The rule shouldn't cover right joins since we got the other for that.
What do you mean by covering right joins? The rule will be applied only for inner and left joins:
final Join origJoin = call.rel(0);
if (origJoin.getJoinType() != JoinRelType.INNER
&& origJoin.getJoinType() != JoinRelType.LEFT) {
return false;
}
If you are talking about checking that right child is projection\join that's because I did JoinToMultiJoinRule with the respect to RightToLeftJoinRule which swaps inputs and adds projection. I needed to cover this case.
We'd want e2e table programs to test it as shown in the other PR.
Affirmative.
If you don't have a lot of time now, that's no problem and I'll get to this probably tomorrow. After the deadline, we can discuss you picking other items for the multi way join without the rush of the deadline. We still have some essential jira tickets to get the feature from an experimental state to production ready here.
I'll start today with 37890 and hope to drop PR by tomorrow. I will keep you informated.
Say we have A JOIN B JOIN C JOIN D and A, B, C have commonJoinKey but D doesnt. My code will give MJ(A, B, C) JOIN D. But what we want is actually MJ(MJ(A, B, C), D). Am I correct?
Exactly 👍 Or in a more realistic scenario, if we have A JOIN B JOIN C JOIN D JOIN E, we could have MJ(MJ(A, B, C), D, E). Or even more concatenated multi joins.
What do you mean by covering right joins? The rule will be applied only for inner and left joins:
I saw you had some tests with right joins. I didn't think it through for project and join nodes playing with the RightJoinToLeftJoin rule. I think that the multijoin check (right instanceof FlinkMultiJoinNode) can be dropped?
I'll start today with 37890 and hope to drop PR by tomorrow. I will keep you informated.
If you're not 100% done you can also open a draft PR so I can collaborate tomorrow with you. Or else I'll have to start my own. And if you don't have the time, it's ok and we can sync on the next items. Thanks, Stepan.