flink icon indicating copy to clipboard operation
flink copied to clipboard

[Flink 16024][Connector][JDBC] Support FilterPushdown

Open qingwei91 opened this issue 2 years ago • 12 comments

What is the purpose of the change

Implement Filter Pushdown in JDBC Connector Source, this is an optimization that will avoid scanning the whole table into Flink when we only need a subset of the table.

Brief change log

  • Implement a SQL Visitor that converts Flink SQL AST to JDBC SQL String
  • Implement FilterPushdown in JDBCDynamicSource using the Visitor
  • Add tests for both Visitor and JDBCDynamicSource

Verifying this change

This change added tests and can be verified as follows:

  • Added Integrated Test in JdbcDynamicTableSourceITCase to make sure there is no regression on SQL Filtering
  • Added JdbcFilterPushdownVisitorTest to verify we are generating SQL correctly

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not documented)

qingwei91 avatar Jul 03 '22 08:07 qingwei91

CI report:

  • 4780c64c8a7bafdd767f21190fca1b11ad9dabb0 UNKNOWN
  • c7bab56eaa252b832613bc015fcf8a51366de49b Azure: FAILURE
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Jul 03 '22 08:07 flinkbot

@qingwei91 Thanks for the PR. Can you make sure that your PR title and especially your commit messages are confirming to the Code Contribution guide? https://flink.apache.org/contributing/contribute-code.html

For example, your commit message should start with [FLINK-16024][Connector][JDBC] in this case.

MartijnVisser avatar Jul 05 '22 11:07 MartijnVisser

@flinkbot run azure

qingwei91 avatar Jul 13 '22 16:07 qingwei91

@flinkbot run azure

qingwei91 avatar Jul 13 '22 18:07 qingwei91

@flinkbot run azure

qingwei91 avatar Jul 14 '22 07:07 qingwei91

@qingwei91 I can't review this since this is not my expertise. I would like to ask @hailin0 to help with a review due to https://github.com/apache/flink/pull/20304 - Else I would like to ask @leonardBang if he can help out

MartijnVisser avatar Jul 29 '22 09:07 MartijnVisser

@qingwei91 Thanks for your contribution. For the current design, I see you use ValueLiteralExpression#toString() to generate the string for literals. This may work for some cases, but not for all cases IMHO. Consider following cases:

  1. ValueLiteralExpression#toString() uses Flink dialect to represent string, and it hard-coded the quote with '. However, in many DBMS, ' is not the only choice.
  2. ValueLiteralExpression#toString() only handles special character ' using escape. In many DBMS, they need more special character handing, e.g. mysql driver
  3. For other types, e.g. TIMESTAMP, DATE, TIME, INTERVAL and so on, they may suffers from this too, because we cannot assume that all DB dialects handle them in the same way.

Another more general way to handle this is to use PrepareStatement.setXXX just like we already did in TableSimpleStatementExecutor and JdbcRowDataLookupFunction. WDYT?

libenchao avatar Aug 08 '22 03:08 libenchao

Hi @libenchao , thanks for the review!

Thanks for pointing out the flaw, you're right. 🙇

On your recommended approach, I believe JdbcFilterPushdownVisitor needs to produce strings or some equivalent data, so that it can be used by JdbcDynamicTableSource::getScanRuntimeProvider, how should I make use of PrepareStatement here? Maybe I am missing something?

I think you're pointing out a fundamental issue with this PR, SQL statement generation has to be dialect-specific, and me trying to provide a default implementation might be a lost cause here.

If we cannot go down the prepared statement route, I can think of 2 ideas:

  1. Implement dialect-specific method that converts ValueLiteralExpression to SQL string literal, and have JdbcFilterPushdownVisitor to make use of it. We don't have to support all dialect in 1 go, and we can simply fallback to in-memory filter when its not supported.
  2. Let JdbcFilterPushdownVisitor be dialect-specific, then every implementation will need to deal with dialect specific thing including literal stringification. This is similar to current approach, the only difference is that currently this PR provides a default implementation, where this option will stop doing it. My implementation is tested and used in Production for SQL Server. Maybe I can rename it and make it specific to SQL Server jdbc dialect? (SQL Server dialect is still opened #20235), so this will have a dependency to it. Likewise, we can fallback to in-memory filtering when dialect without an implementation.

I think option 1 is less code, but probably more fiddly and easier to break. Option 2 is likely gonna be more code, but the separation is cleaner and less likely to break.

Let me know your thoughts. 😄

qingwei91 avatar Aug 08 '22 20:08 qingwei91

I believe JdbcFilterPushdownVisitor needs to produce strings or some equivalent data, so that it can be used by JdbcDynamicTableSource::getScanRuntimeProvider, how should I make use of PrepareStatement here? Maybe I am missing something?

@qingwei91 Currently JdbcRowDataInputFormat already uses PreparedStatement, and 'scan.partition' is implemented using it, hence we can do it. JdbcFilterPushdownVisitor does not necessarily need to return String, it can return anything we need. We may need to rethink about the design, especially for the following points:

  1. What functions should we support, e.g. IN, BETWEEN?
  2. Are these functions supported in all JDBC dialects, and do they use the same operator? If not, how should we abstract it out to make it dialect specific?

libenchao avatar Aug 09 '22 02:08 libenchao

Hi @libenchao, oh I see, I didnt spot that JdbcRowDataInputFormat is using PreparedStatement under the hood.

Thanks for pointing it out, this will be larger change than I expect, I will give it a go.

  1. What functions should we support, e.g. IN, BETWEEN?

I think we can tackle this incrementally? I believe IN is supported out of the box, because Flink compile IN into multiple X=Y condition chained together by OR, I never looked into BETWEEN though

  1. Are these functions supported in all JDBC dialects, and do they use the same operator? If not, how should we abstract it out to make it dialect specific?

I think ultimate we need to allow dialect specificness, right now I design it such that the query generator (ie. JdbcFilterPushdownVisitor) is part of JDBCDialect, so each dialect can provide their own instance to deal with it. Do you think this design is okay? Or is there a better way?

qingwei91 avatar Aug 09 '22 08:08 qingwei91

I believe IN is supported out of the box, because Flink compile IN into multiple X=Y condition chained together by OR

Not exactly, we have a threshold (default 4): https://github.com/apache/flink/blob/208f08b406a7fd48890cda16d317a30ee892a2e7/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ConvertToNotInOrInRule.scala#L47

I think we can tackle this incrementally?

I agree, we can start from some common functions, such as =, <>, <, >, <=, >=, IS NULL, IS NOT NULL, IN, and leave others to the future.

libenchao avatar Aug 09 '22 09:08 libenchao

Wow, Thanks, It works for me~

HeChuanXUPT avatar Sep 06 '22 06:09 HeChuanXUPT

@flinkbot run azure

qingwei91 avatar Oct 09 '22 10:10 qingwei91

Hi @libenchao I finally got around and implemented your suggestion. Do you mind to take a look again?

I didn't implement additional operators like IN and BETWEEN because I am quite busy of late. Given they can be implemented later incrementally, and its not going to break anything, I wonder if we can defer those to a different ticket?

qingwei91 avatar Oct 11 '22 19:10 qingwei91

@qingwei91 Thanks for the updating, I'll review this PR in a few days.

I didn't implement additional operators like IN and BETWEEN because I am quite busy of late. Given they can be implemented later incrementally, and its not going to break anything, I wonder if we can defer those to a different ticket?

I'm ok with this.

libenchao avatar Oct 12 '22 02:10 libenchao

@flinkbot run azure

qingwei91 avatar Oct 23 '22 15:10 qingwei91

@flinkbot run azure

qingwei91 avatar Oct 23 '22 20:10 qingwei91

Hi @libenchao , thank you very much for your review 👍

I've addressed all of your concern. On the TablePlanTest, do you mind to check if that's how it supposed to work? I don't think I understand internal good enough to judge

qingwei91 avatar Oct 24 '22 18:10 qingwei91

One small tip, do not rebase/force-pushing before the reviewer asks you because this will make the reviewer hard to do incremental review.

Sorry, my bad, I was advised to squashed in prev PR, but of course that should only be done right before merging.

qingwei91 avatar Oct 28 '22 21:10 qingwei91

@flinkbot run azure

qingwei91 avatar Oct 28 '22 21:10 qingwei91

@flinkbot run azure

qingwei91 avatar Oct 30 '22 14:10 qingwei91

@flinkbot run azure

qingwei91 avatar Oct 30 '22 21:10 qingwei91

@libenchao

Thanks for the update. Please do not use 'squash' or 'force-push' unless you must or the reviewer asks you. (I go through all the codes again, and left several minor comment)

Sorry, in the last commit, I thought to bundle all changes I've made since your last review into https://github.com/apache/flink/pull/20140/commits/55d5227fe040890f0c89e366a5ae19c6c75c1d21, so that you can review just that without noise. I wont do it again.

I will go through the comments and fix them

qingwei91 avatar Oct 31 '22 18:10 qingwei91

Hi @libenchao , this is the new commit I added to address your comment.

https://github.com/apache/flink/pull/20140/commits/42078a44864b74e66a64fa69b8b46cf717be279d

I also added support IS NULL and IS NOT NULL as these 2 are quite common

qingwei91 avatar Nov 01 '22 18:11 qingwei91