[FLINK-36165][source-connector/mysql] Support capturing snapshot data with conditions
Hey, this is an implementation designed to capture snapshot data with filtering conditions.
For example, by specifying scan.snapshot.filters: db.user_table:id > 200;, we can synchronize only the user data where the id is greater than 200.
issue link: https://issues.apache.org/jira/browse/FLINK-36165
Thanks for @uicosp's great work! It's indeed a long awaited feature.
Seems Debezium has a similar option called snapshot.select.statement.overrides, which allows users to project out unwanted columns and filter rows based on custom predicates.
As these options aren't available in incremental framework, it would be nice if we could support both row and column level pushdown with similar syntax, since they're both related to tweaking snapshot querying SQL. WDYT?
Thanks for @uicosp's great work! It's indeed a long awaited feature.
Seems Debezium has a similar option called
snapshot.select.statement.overrides, which allows users to project out unwanted columns and filter rows based on custom predicates.As these options aren't available in incremental framework, it would be nice if we could support both row and column level pushdown with similar syntax, since they're both related to tweaking snapshot querying SQL. WDYT?
it is also support table api and datastream api with incremental framework
Should we consider this pr with only snapshot startoption and back fill ?
Column pruning is certainly a feature worth considering. In theory, it would only require minor adjustments to the current design, such as implementing a syntax like scan.snapshot.filters: db.user:id>200:id,name,age.
Taking this a step further, since the pipeline config already includes a transform option, we could leverage it to automatically extract filter conditions and projection fields, dynamically generating scan.snapshot.filters and thereby simplifying the config.
That said, I’m currently focused on other tasks and don't have time to work on this. Once I have time, I’d be happy to revisit and enhance this feature.
Does this function also apply to oracle.cdc ?
Does this function also apply to oracle.cdc ?
no, this function only works for mysql connector.
Does this function also apply to oracle.cdc ?
no, this function only works for mysql connector.
Thanks for the reply, does oracle connector have similar capabilities? I tried to configure debezium.snapshot.select.statement.overrides but it doesn't work.
Does this function also apply to oracle.cdc ?
no, this function only works for mysql connector.
Thanks for the reply, does oracle connector have similar capabilities? I tried to configure
debezium.snapshot.select.statement.overridesbut it doesn't work.
I haven't used oracle, so I'm not sure. You can try to find the answer through the offical flink cdc docs or the oracle connector source code.
debezium.snapshot.select.statement.overrides
debezium.snapshot.select.statement.overridesoption only applicable to read data by lock algorithms,this is incremental snapshot for capturing data. You can completely implement other incremental snapshot connectors methods by following the snapshot filtering of MySQL connector
debezium.snapshot.select.statement.overrides
debezium.snapshot.select.statement.overridesoption only applicable to read data by lock algorithms,this is incremental snapshot for capturing data. You can completely implement other incremental snapshot connectors methods by following the snapshot filtering of MySQL connector
Thank you. I will follow your suggestion to attempt implementing a filtering feature based on an incremental snapshot connector
@yuxiqian @loserwang1024 PTAL
This version adds support for configuring scan.snapshot.filters in the pipeline using a YAML list format. Example:
scan.snapshot.filters:
- table: db1.user_table_[0-9]+
filter: id > 100
- table: db[1-2].[app|web]_order_\.*
filter: city != 'China:beijing'
Note:
The parameter type of Configuration.fromMap has been changed from Map<String, String> map to Map<String, ?> map.
@yuxiqian Please confirm that this change does not break existing design conventions.