flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[FLINK-36165][source-connector/mysql] Support capturing snapshot data with conditions

Open uicosp opened this issue 1 year ago • 12 comments

Hey, this is an implementation designed to capture snapshot data with filtering conditions.

For example, by specifying scan.snapshot.filters: db.user_table:id > 200;, we can synchronize only the user data where the id is greater than 200.

issue link: https://issues.apache.org/jira/browse/FLINK-36165

uicosp avatar Dec 04 '24 03:12 uicosp

Thanks for @uicosp's great work! It's indeed a long awaited feature.

Seems Debezium has a similar option called snapshot.select.statement.overrides, which allows users to project out unwanted columns and filter rows based on custom predicates.

As these options aren't available in incremental framework, it would be nice if we could support both row and column level pushdown with similar syntax, since they're both related to tweaking snapshot querying SQL. WDYT?

yuxiqian avatar Dec 06 '24 09:12 yuxiqian

Thanks for @uicosp's great work! It's indeed a long awaited feature.

Seems Debezium has a similar option called snapshot.select.statement.overrides, which allows users to project out unwanted columns and filter rows based on custom predicates.

As these options aren't available in incremental framework, it would be nice if we could support both row and column level pushdown with similar syntax, since they're both related to tweaking snapshot querying SQL. WDYT?

it is also support table api and datastream api with incremental framework

ThorneANN avatar Dec 12 '24 05:12 ThorneANN

Should we consider this pr with only snapshot startoption and back fill ?

ThorneANN avatar Dec 12 '24 07:12 ThorneANN

Column pruning is certainly a feature worth considering. In theory, it would only require minor adjustments to the current design, such as implementing a syntax like scan.snapshot.filters: db.user:id>200:id,name,age.

Taking this a step further, since the pipeline config already includes a transform option, we could leverage it to automatically extract filter conditions and projection fields, dynamically generating scan.snapshot.filters and thereby simplifying the config.

That said, I’m currently focused on other tasks and don't have time to work on this. Once I have time, I’d be happy to revisit and enhance this feature.

uicosp avatar Dec 12 '24 15:12 uicosp

Does this function also apply to oracle.cdc ?

mageniao2 avatar Apr 15 '25 01:04 mageniao2

Does this function also apply to oracle.cdc ?

no, this function only works for mysql connector.

uicosp avatar Apr 15 '25 02:04 uicosp

Does this function also apply to oracle.cdc ?

no, this function only works for mysql connector.

Thanks for the reply, does oracle connector have similar capabilities? I tried to configure debezium.snapshot.select.statement.overrides but it doesn't work.

mageniao2 avatar Apr 15 '25 02:04 mageniao2

Does this function also apply to oracle.cdc ?

no, this function only works for mysql connector.

Thanks for the reply, does oracle connector have similar capabilities? I tried to configure debezium.snapshot.select.statement.overrides but it doesn't work.

I haven't used oracle, so I'm not sure. You can try to find the answer through the offical flink cdc docs or the oracle connector source code.

uicosp avatar Apr 15 '25 02:04 uicosp

debezium.snapshot.select.statement.overrides debezium.snapshot.select.statement.overrides option only applicable to read data by lock algorithms,this is incremental snapshot for capturing data. You can completely implement other incremental snapshot connectors methods by following the snapshot filtering of MySQL connector

ThorneANN avatar Apr 15 '25 03:04 ThorneANN

debezium.snapshot.select.statement.overrides debezium.snapshot.select.statement.overrides option only applicable to read data by lock algorithms,this is incremental snapshot for capturing data. You can completely implement other incremental snapshot connectors methods by following the snapshot filtering of MySQL connector

Thank you. I will follow your suggestion to attempt implementing a filtering feature based on an incremental snapshot connector

mageniao2 avatar Apr 15 '25 05:04 mageniao2

@yuxiqian @loserwang1024 PTAL

ThorneANN avatar Apr 16 '25 02:04 ThorneANN

This version adds support for configuring scan.snapshot.filters in the pipeline using a YAML list format. Example:

scan.snapshot.filters:
  - table: db1.user_table_[0-9]+
    filter: id > 100
  - table: db[1-2].[app|web]_order_\.* 
    filter: city != 'China:beijing'

Note: The parameter type of Configuration.fromMap has been changed from Map<String, String> map to Map<String, ?> map. @yuxiqian Please confirm that this change does not break existing design conventions.

uicosp avatar Sep 05 '25 15:09 uicosp