[FLINK-36808][table-planner] Fix LookupJoin bug when used with filterable table source
What is the purpose of the change
- Fix bug in LookupJoin that occurs when used together with filterable (that support filter pushdown) table and union all
Brief description of the bug
Given a union all query:
- that combines results of the two lookup joins
- that these lookup joins have different filter queries
SELECT
s.id,
s.name,
s.txn_time,
d.status
FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS `d`
ON
`s`.`id` = `d`.`id`
WHERE
`d`.`status` = 'OK'
UNION ALL
SELECT
s.id,
s.name,
s.txn_time,
d.status
FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS `d`
ON
`s`.`id` = `d`.`id`
WHERE
`d`.`status` = 'NOT_EXISTS';
In this situation the planner will pushdown the filter condition into the table part of the lookup join, but the structure of the lookup joins stays the same, e.g, they will have same digests with different table / temporal table.
This is the problem since the Calcite Volcano optimizer will register them equivalent because it does so using the digest of the relation nodes.
This introduces the bug because when optimizing the Union, both parts of the query will be treated same (even though we have different where clauses) and the found cheapest plan will be same for both lookup joins.
You can also see the effect if you put non-existing filter first the result will be empty, because the first lookupjoin is also used for the second part of the union.
Alternative Solutions
The better solution would be to improve the LookupJoin expression to also include the filter condition into the table name. For example,
LookupJoin(table=[default_catalog.default_database.dim, filter=[<PUSHED-DOWN-FILTERS>]], joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id], upsertKey=[[0]])
But this would require many refactoring, and mainly in the tests.
In this PR, I have opted for adding another filter with pushed down filter conditions if the LookupJoin contains a table with filter pushdowns.
Brief change log
- Add unit test to reproduce the test
- Add (one alternative) fix to resolve the bug
Verifying this change
The change adds test case that reproduces the bug that can be verified by the fixes.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
@Public(Evolving): no - The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
CI report:
- 11a51dc1523064dc07f93567d791c6af8afd7741 Azure: FAILURE
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
nit: the Jira number has an extra 0 at the end in the PR title
@morazow If I am understanding this correctly, the filtered pushed down joins cause the Volcano planner to uniquely identify each look up join.
I wonder what happens if one of the lookup joins supports filter push down but the other doesn't? Does the <PUSHED-DOWN-FILTERS> include the name of the source as well as the filter? It would be good to have tests for when both sources support filter pushdown, neither do and only 1 does.
Thanks @davidradl, addressed your findings, please have another look 🤝
the filtered pushed down joins cause the Volcano planner to uniquely identify each look up join.
Yes, but based on the getDigests method, which results in a string like below:
LookupJoin(table=[default_catalog.default_database.dim], joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id], upsertKey=[[0]])
Here we have the table name, but no information on the pushed down filter conditions. For this issue to happen, the lookup table's should be same (since it is in the digest), so we cannot test if one part supports and other part doesn't test case.
Additionally, if the source doesn't support the filter pushdown it will be represented in the digest with the where keyword. This will not also reproduce the bug because the digests are different.
Hello @xuyangzhong ,
Thanks for the review! I have addressed your suggestions, please have a look. I put them to separate commit for now, later I'll squash it 👍