flink
flink copied to clipboard
[Flink 16024][Connector][JDBC] Support FilterPushdown
What is the purpose of the change
Implement Filter Pushdown in JDBC Connector Source, this is an optimization that will avoid scanning the whole table into Flink when we only need a subset of the table.
Brief change log
- Implement a SQL Visitor that converts Flink SQL AST to JDBC SQL String
- Implement FilterPushdown in JDBCDynamicSource using the Visitor
- Add tests for both Visitor and JDBCDynamicSource
Verifying this change
This change added tests and can be verified as follows:
- Added Integrated Test in JdbcDynamicTableSourceITCase to make sure there is no regression on SQL Filtering
- Added JdbcFilterPushdownVisitorTest to verify we are generating SQL correctly
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving)
: (yes) - The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
- The S3 file system connector: (no)
Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not documented)
CI report:
- 4780c64c8a7bafdd767f21190fca1b11ad9dabb0 UNKNOWN
- c7bab56eaa252b832613bc015fcf8a51366de49b Azure: FAILURE
Bot commands
The @flinkbot bot supports the following commands:-
@flinkbot run azure
re-run the last Azure build
@qingwei91 Thanks for the PR. Can you make sure that your PR title and especially your commit messages are confirming to the Code Contribution guide? https://flink.apache.org/contributing/contribute-code.html
For example, your commit message should start with [FLINK-16024][Connector][JDBC]
in this case.
@flinkbot run azure
@flinkbot run azure
@flinkbot run azure
@qingwei91 I can't review this since this is not my expertise. I would like to ask @hailin0 to help with a review due to https://github.com/apache/flink/pull/20304 - Else I would like to ask @leonardBang if he can help out
@qingwei91 Thanks for your contribution. For the current design, I see you use ValueLiteralExpression#toString()
to generate the string for literals. This may work for some cases, but not for all cases IMHO.
Consider following cases:
-
ValueLiteralExpression#toString()
uses Flink dialect to represent string, and it hard-coded the quote with'
. However, in many DBMS,'
is not the only choice. -
ValueLiteralExpression#toString()
only handles special character'
using escape. In many DBMS, they need more special character handing, e.g. mysql driver - For other types, e.g.
TIMESTAMP
,DATE
,TIME
,INTERVAL
and so on, they may suffers from this too, because we cannot assume that all DB dialects handle them in the same way.
Another more general way to handle this is to use PrepareStatement.setXXX
just like we already did in TableSimpleStatementExecutor
and JdbcRowDataLookupFunction
. WDYT?
Hi @libenchao , thanks for the review!
Thanks for pointing out the flaw, you're right. 🙇
On your recommended approach, I believe JdbcFilterPushdownVisitor
needs to produce strings or some equivalent data, so that it can be used by JdbcDynamicTableSource::getScanRuntimeProvider
, how should I make use of PrepareStatement here? Maybe I am missing something?
I think you're pointing out a fundamental issue with this PR, SQL statement generation has to be dialect-specific, and me trying to provide a default implementation might be a lost cause here.
If we cannot go down the prepared statement route, I can think of 2 ideas:
- Implement dialect-specific method that converts
ValueLiteralExpression
to SQL string literal, and haveJdbcFilterPushdownVisitor
to make use of it. We don't have to support all dialect in 1 go, and we can simply fallback to in-memory filter when its not supported. - Let
JdbcFilterPushdownVisitor
be dialect-specific, then every implementation will need to deal with dialect specific thing including literal stringification. This is similar to current approach, the only difference is that currently this PR provides a default implementation, where this option will stop doing it. My implementation is tested and used in Production for SQL Server. Maybe I can rename it and make it specific to SQL Server jdbc dialect? (SQL Server dialect is still opened #20235), so this will have a dependency to it. Likewise, we can fallback to in-memory filtering when dialect without an implementation.
I think option 1 is less code, but probably more fiddly and easier to break. Option 2 is likely gonna be more code, but the separation is cleaner and less likely to break.
Let me know your thoughts. 😄
I believe JdbcFilterPushdownVisitor needs to produce strings or some equivalent data, so that it can be used by JdbcDynamicTableSource::getScanRuntimeProvider, how should I make use of PrepareStatement here? Maybe I am missing something?
@qingwei91 Currently JdbcRowDataInputFormat
already uses PreparedStatement
, and 'scan.partition' is implemented using it, hence we can do it. JdbcFilterPushdownVisitor
does not necessarily need to return String
, it can return anything we need.
We may need to rethink about the design, especially for the following points:
- What functions should we support, e.g.
IN
,BETWEEN
? - Are these functions supported in all JDBC dialects, and do they use the same operator? If not, how should we abstract it out to make it dialect specific?
Hi @libenchao, oh I see, I didnt spot that JdbcRowDataInputFormat
is using PreparedStatement under the hood.
Thanks for pointing it out, this will be larger change than I expect, I will give it a go.
- What functions should we support, e.g. IN, BETWEEN?
I think we can tackle this incrementally? I believe IN is supported out of the box, because Flink compile IN into multiple X=Y condition chained together by OR, I never looked into BETWEEN though
- Are these functions supported in all JDBC dialects, and do they use the same operator? If not, how should we abstract it out to make it dialect specific?
I think ultimate we need to allow dialect specificness, right now I design it such that the query generator (ie. JdbcFilterPushdownVisitor) is part of JDBCDialect, so each dialect can provide their own instance to deal with it. Do you think this design is okay? Or is there a better way?
I believe IN is supported out of the box, because Flink compile IN into multiple X=Y condition chained together by OR
Not exactly, we have a threshold (default 4): https://github.com/apache/flink/blob/208f08b406a7fd48890cda16d317a30ee892a2e7/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ConvertToNotInOrInRule.scala#L47
I think we can tackle this incrementally?
I agree, we can start from some common functions, such as =
, <>
, <
, >
, <=
, >=
, IS NULL
, IS NOT NULL
, IN
, and leave others to the future.
Wow, Thanks, It works for me~
@flinkbot run azure
Hi @libenchao I finally got around and implemented your suggestion. Do you mind to take a look again?
I didn't implement additional operators like IN and BETWEEN because I am quite busy of late. Given they can be implemented later incrementally, and its not going to break anything, I wonder if we can defer those to a different ticket?
@qingwei91 Thanks for the updating, I'll review this PR in a few days.
I didn't implement additional operators like IN and BETWEEN because I am quite busy of late. Given they can be implemented later incrementally, and its not going to break anything, I wonder if we can defer those to a different ticket?
I'm ok with this.
@flinkbot run azure
@flinkbot run azure
Hi @libenchao , thank you very much for your review 👍
I've addressed all of your concern. On the TablePlanTest, do you mind to check if that's how it supposed to work? I don't think I understand internal good enough to judge
One small tip, do not rebase/force-pushing before the reviewer asks you because this will make the reviewer hard to do incremental review.
Sorry, my bad, I was advised to squashed in prev PR, but of course that should only be done right before merging.
@flinkbot run azure
@flinkbot run azure
@flinkbot run azure
@libenchao
Thanks for the update. Please do not use 'squash' or 'force-push' unless you must or the reviewer asks you. (I go through all the codes again, and left several minor comment)
Sorry, in the last commit, I thought to bundle all changes I've made since your last review into https://github.com/apache/flink/pull/20140/commits/55d5227fe040890f0c89e366a5ae19c6c75c1d21, so that you can review just that without noise. I wont do it again.
I will go through the comments and fix them
Hi @libenchao , this is the new commit I added to address your comment.
https://github.com/apache/flink/pull/20140/commits/42078a44864b74e66a64fa69b8b46cf717be279d
I also added support IS NULL and IS NOT NULL as these 2 are quite common