flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-37889] [table-planner] Add JoinToMultiJoinRule

Open SteveStevenpoor opened this issue 6 months ago • 5 comments

What is the purpose of the change

Introduce FlinkStreamJoinToMultiJoinRule

It also covers FLINK-37890

Brief change log

  • Added FlinkStreamJoinToMultiJoinRule
  • Added FlinkMultiJoinNode

Verifying this change

  • Added rel plan tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving)`: no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

SteveStevenpoor avatar Jun 17 '25 14:06 SteveStevenpoor

Hey, @gustavodemorais ! Sorry for the waiting, I was unexpectedly extremely busy last week. Please take a look at this. There will be plenty of refactoring but this version works for now with commonJoinKey checking and right joins enabled.

SteveStevenpoor avatar Jun 17 '25 14:06 SteveStevenpoor

CI report:

  • 5b2fcd843e7581eebfac503cdbe3c02c5b456163 Azure: FAILURE
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Jun 17 '25 14:06 flinkbot

Hey, @gustavodemorais ! Sorry for the waiting, I was unexpectedly extremely busy last week. Please take a look at this. There will be plenty of refactoring but this version works for now with commonJoinKey checking and right joins enabled.

Hey @SteveStevenpoor, I understand, sorry for the rush because of the deadline. We're far with https://github.com/apache/flink/pull/26687 and will probably merge it today or tomorrow.

If you have the capacity, you could open a PR against that branch to cover mostly FLINK-37890. It'll be automatically retargeted to master as soon as we merge https://github.com/apache/flink/pull/26687. Some observations from reviewing this PR:

  • No commonJoinKey in general does not mean we don't use MultiJoin, it just means we have to break it into multiple MultiJoins. It's in the description of FLINK-37890. testInnerJoinChainNoCommonJoinKey e.g. would have two multiple joins with two inputs each. We want a test to check if we can have for example 2, 3 MultiJoins combined.
  • We want to use calcilte's MultiJoin for now in the rule to be able to combined it with the native project and merge rule for now, as we did in the other PR. commonJoinKeys doesn't have to be stored. We just need to check if the constraint is respected here.
  • The rule shouldn't cover right joins since we got the other for that.
  • We'd want e2e table programs to test it as shown in the other PR.

If you don't have a lot of time now, that's no problem and I'll get to this probably tomorrow. After the deadline, we can discuss you picking other items for the multi way join without the rush of the deadline. We still have some essential jira tickets to get the feature from an experimental state to production ready here.

gustavodemorais avatar Jun 18 '25 09:06 gustavodemorais

If you have the capacity, you could open a PR against that branch to cover mostly FLINK-37890. It'll be automatically retargeted to master as soon as we merge #26687. Some observations from reviewing this PR:

I'll do it!

No commonJoinKey in general does not mean we don't use MultiJoin, it just means we have to break it into multiple MultiJoins. It's in the description of FLINK-37890. testInnerJoinChainNoCommonJoinKey e.g. would have two multiple joins with two inputs each. We want a test to check if we can have for example 2, 3 MultiJoins combined.

Say we have A JOIN B JOIN C JOIN D and A, B, C have commonJoinKey but D doesnt. My code will give MJ(A, B, C) JOIN D. But what we want is actually MJ(MJ(A, B, C), D). Am I correct?

We want to use calcilte's MultiJoin for now in the rule to be able to combined it with the native project and merge rule for now, as we did in the other PR. commonJoinKeys doesn't have to be stored. We just need to check if the constraint is respected here.

I needed it to make check for commonJoinKey more simple. I can use calcite's mj instead.

The rule shouldn't cover right joins since we got the other for that.

What do you mean by covering right joins? The rule will be applied only for inner and left joins:

        final Join origJoin = call.rel(0);
        if (origJoin.getJoinType() != JoinRelType.INNER
                && origJoin.getJoinType() != JoinRelType.LEFT) {
            return false;
        }

If you are talking about checking that right child is projection\join that's because I did JoinToMultiJoinRule with the respect to RightToLeftJoinRule which swaps inputs and adds projection. I needed to cover this case.

We'd want e2e table programs to test it as shown in the other PR.

Affirmative.

If you don't have a lot of time now, that's no problem and I'll get to this probably tomorrow. After the deadline, we can discuss you picking other items for the multi way join without the rush of the deadline. We still have some essential jira tickets to get the feature from an experimental state to production ready here.

I'll start today with 37890 and hope to drop PR by tomorrow. I will keep you informated.

SteveStevenpoor avatar Jun 18 '25 10:06 SteveStevenpoor

Say we have A JOIN B JOIN C JOIN D and A, B, C have commonJoinKey but D doesnt. My code will give MJ(A, B, C) JOIN D. But what we want is actually MJ(MJ(A, B, C), D). Am I correct?

Exactly 👍 Or in a more realistic scenario, if we have A JOIN B JOIN C JOIN D JOIN E, we could have MJ(MJ(A, B, C), D, E). Or even more concatenated multi joins.

What do you mean by covering right joins? The rule will be applied only for inner and left joins:

I saw you had some tests with right joins. I didn't think it through for project and join nodes playing with the RightJoinToLeftJoin rule. I think that the multijoin check (right instanceof FlinkMultiJoinNode) can be dropped?

I'll start today with 37890 and hope to drop PR by tomorrow. I will keep you informated.

If you're not 100% done you can also open a draft PR so I can collaborate tomorrow with you. Or else I'll have to start my own. And if you don't have the time, it's ok and we can sync on the next items. Thanks, Stepan.

gustavodemorais avatar Jun 18 '25 14:06 gustavodemorais