[FLINK-35716] Adds a rule which splits out async calls from join cond…
What is the purpose of the change
When AsyncScalarFunction is used in a join, the UDF is part of the join expression. This is not currently handled since it expects all uses of the UDF to be in a calc. So, this PR introduces a logical rule which factors out the UDF call to filter on the result of the join in a calc, which can be handled with existing rules.
This is similar to SplitPythonConditionFromJoinRule and could theoretically be shared, though this is enabled for all join types rather than just inner joins.
Verifying this change
This change added tests and can be verified as follows:
Call the UDF within join condition and verify that the UDF is not part of the join logical node:
SELECT a from MyTable RIGHT JOIN MyTable2 ON a = a2 "
+ "WHERE a = a2 AND func6(a, a2) > 10
- Added AsyncCalcSplitRuleTest.java test which verifies this.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
@Public(Evolving): no - The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector:no
Documentation
- Does this pull request introduce a new feature? no
CI report:
- 64bc8a45f5be397f21ef29bc60e3b60cbf1a9f7d Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
Hi @HuangXingBo, I see you did the original PR for the python version. I was wondering about the reasoning for enabling this only for inner joins. I would like to enable this for AsyncScalarFunction for more types, but trying to understand if I'm missing some SQL semantic reasoning.
@flinkbot run azure
I have't spend too much thought on this but some pointers to verify that the rewrite is actually correct: https://stackoverflow.com/questions/55893354/predicate-pushdown-vs-on-clause We should definitely add more tests also for the ON clause.
Thanks for that link. I read through this quite a bit and here are my understandings:
- ON clauses are evaluated "before the join", so if for example, there's a LEFT JOIN and the right row doesn't match the ON clause, the left row can still join with null. These clauses show up at the join operator or can be pushed down to a side if possible.
- WHERE clauses are evaluated "after the join", and are safe to pull up right after a join operator.
The main thing which we do not want is to take all ON clauses and pull them up after the join because you're turning all join types into INNER JOINS. For this reason, I put in a check that ensures that this logic runs for INNER JOINS only, and will otherwise throw a TableException if an AsyncScalarFunction call ends up in the ON clause without being an INNER JOIN.
This behavior is consistent with python and explains the behavior.
I added more test cases for the combinations of:
- inner join
- right join
- left join
And for function call types:
- Single param (will try to push down if in ON)
- Params from both tables (will keep as join condition if in ON)
And for call location:
- WHERE clause
- ON clause
@flinkbot run azure
@flinkbot run azure