[FLINK-37973][table-planner] Add support for right joins to multijoin operator
What is the purpose of the change
The newly introduced MultiJoin operator currently does not support right joins. Since all right joins are converted to left joins, we need to enable the FlinkRightToLeftJoinRule in the MultiJoin rule set and handle left joins using projections.
Brief change log
-
Introduced
FlinkOrderPreservingProjection, a wrapper around the original projection. This class is used to define projections created byFlinkRightToLeftJoinRule -
Patched
JoinToMultiJoinRuleto supportFlinkOrderPreservingProjection(i. e. right joins). -
Other minor adjustments related to
JoinToMultiJoinRule.
Verifying this change
- Added the following test cases to
MultiJoinTest:testThreeWayRightInnerJointestThreeWayRightRightJointestThreeWayInnerRightJoin
TODO: Semantic and restore tests will be added once StreamExecMultiJoin is fixed to support right joins.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
@Public(Evolving): no - The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
CI report:
- 0ceca08705c94b4237b1df865bd5298c8bd21d7e Azure: FAILURE
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
@SteveStevenpoor can you provide a quick description of the tests/changes that are still pending?
Let's start from the example:
SELECT u.user_id_0, u.name, o.order_id, p.payment_id, s.location FROM Users u RIGHT JOIN Orders o ON u.user_id_0 = o.user_id_1 RIGHT JOIN Payments p ON u.user_id_0 = p.user_id_2 INNER JOIN Shipments s ON p.user_id_2 = s.user_id_3
So the original ast will look like:
After right join to left join convertion:
In this MR I get rid of projections when converting to multi join (No INNER or LEFT joins but may be projections). So It will look like this:
So what's the problem with tests? StreamExecMultiJoin together with StreamingMultiJoinOperator highly relies on the fact that the original ast will have joins only on the left side. However, in the example above MultiJoin(P, O, U, S) need to firstly merge O and U, then P and JoinedRow from O and U, and then JoinedRow from P, O, U with S. But now it merges P and O first like we have different query structure.
Why can't we construct MJ(O, U, P, S)? Because P must be on the right side of O, U to consider correct join types.
What I try to do: After adding support for opt rel plan construction for right joins I've added semantic tests. Then I fixed StreamExecMultiJoin to generate proper join conditions without relying on the specific AST structure. Now I'm working on adapting StreamingMultiJoinOperator to work with described cases. I'm going to construct something like this: Before: When record arrives we start to construct resulting record from leftmost table With example above: MJ(P, O, U, S) ---> when record from User arrives we merge P and O, PO and U, POU and S leading to incorrect result.
After:
When record arrives we start to construct resulting record from leftmost table from proper level:
Example: MJ(P, O, U, S)
This way we merge O and U, then P and OU, and then POU with S. Also we can save original join type for the level.
Also it does not matter if we change right joins to left ones before JoinToMultiJoinRule or after. We will need to add projection on top of multi join anyway and handle right-side joins in StreamingMultiJoinOperator. Since we already have FlinkRightToLeftJoinRule I think it's ok to assume it will be applied before JoinToMultiJoinRule.
@gustavodemorais let me know your thoughts on it, my friend.
I've spent some time on this but need to make sure I understand everything so we don't accidentally add bugs to the others rules. I'll be able to get to this in about 1-2 weeks. If we get more eyes here, that wouldn't be bad.
I've spent some time on this but need to make sure I understand everything so we don't accidentally add bugs to the others rules. I'll be able to get to this in about 1-2 weeks. If we get more eyes here, that wouldn't be bad.
I'm almost done with it. Will update MR by the end of the week and will ask you and someone else to review.
TODO:
- [ ] Change java docs
- [x] A lot of refactoring
- [ ] Add restore tests
- [x] Resolve merge conflicts
- [ ] Benchmark versus binary joins
- [ ] Benchmark versus unflattened right joins