[FLINK-36813][cdc-connectors][mysql] support mysql sync part columns
Background In some scenarios, MySQL synchronization only expects to synchronize specified fields instead of all fields in the table.
- The user only has the permission for some fields in MySQL
- The user has too many fields in a single table and only wants to synchronize some fields, for example, here https://github.com/apache/flink-cdc/discussions/3058
Current situation For the incremental stage, you only need to configure the column.include.list property of debezium to support the synchronization of some fields in the incremental stage, refer to: https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list
For the full snapshot stage, * is currently used in MySqlSnapshotSplitReadTask, refer to
if (isScanningData) {
return buildSelectWithRowLimits(
tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty());
Solution We can refer to debezium RelationalSnapshotChangeEventSource, The user configures column.include.list, and then captures the specific columns in MySqlSnapshotSplitReadTask, and splices them when constructing Scan SQL.
@leonardBang @ruanhang1993 PTAL, Thanks
@JNSimba Thanks for your PR.
I have some questions about the usage of column.include.list.
Consider that there are 3 tables db1.table1, db1.table2 and db1.table3 to read, whose schema are the same.
| name | type | note |
|---|---|---|
| id | varchar | primary key |
| name | varchar | |
| age | int | |
| address | varchar |
For a table source as follows, will the change events contain all columns for db1.table2 and db1.table3 in incremental phase?
CREATE TABLE test_tbl (
`id` STRING NOT NULL,
name STRING,
age INT,
address STRING,
primary key (`id`) not enforced
) WITH (
'connector' = 'mysql-cdc',
'database-name' = 'db1',
'table-name' = 'table[0-9]+',
'debezium.column.include.list' = 'db1.table1.id,db1.table1.name',
......
);
- How do we handle the debezium setting column.exclude.list? If the table source is as follows, what will the change data be?
CREATE TABLE test_tbl (
`id` STRING NOT NULL,
name STRING,
age INT,
address STRING,
primary key (`id`) not enforced
) WITH (
'connector' = 'mysql-cdc',
'database-name' = 'db1',
'table-name' = 'table[0-9]+',
'debezium.column.include.list' = 'db1.table1.id,db1.table1.name',
'debezium.column.exclude.list' = 'db1.table2.address,db1.table2.name',
......
);
Hi @JNSimba, could you please rebase this PR with latest master when available?
Code style verifier has been updated to enforce JUnit 5 + AssertJ framework and these classes might need to be migrated:
-
JUnit 4 style test annotations should be changed to JUnit 5 equivalents
-
org.junit.Test=>org.junit.jupiter.api.Test -
@Before,@BeforeClass=>@BeforeEach,@BeforeAll -
@After,@AfterClass=>@AfterEach,@AfterAll
-
-
JUnit Assertions / Hamcrest Assertions are not allowed, including:
-
org.junit.Assert -
org.junit.jupiter.api.Assertions -
org.hamcrest.*
-
Use org.assertj.core.api.Assertions instead.
Running mvn verify -DskipTests locally could check if all these requirements have been satisfied.
@JNSimba Could we provide this feature by SupportsProjectionPushDown?
debezium.column.include.list and debezium.column.exclude.list is hard for users to understand and use. We could use SupportsProjectionPushDown interface to automatically generate the debezium setting. In this way we don't need to validate whether there is a conflict between the provided schema and the debezium setting.
@JNSimba Could we provide this feature by
SupportsProjectionPushDown?debezium.column.include.listanddebezium.column.exclude.listis hard for users to understand and use. We could useSupportsProjectionPushDowninterface to automatically generate the debezium setting. In this way we don't need to validate whether there is a conflict between the provided schema and the debezium setting.
Thanks @ruanhang1993 , This is a good idea, I will modify based on SupportsProjectionPushDown
@JNSimba Could we provide this feature by
SupportsProjectionPushDown?debezium.column.include.listanddebezium.column.exclude.listis hard for users to understand and use. We could useSupportsProjectionPushDowninterface to automatically generate the debezium setting. In this way we don't need to validate whether there is a conflict between the provided schema and the debezium setting.
@ruanhang1993 SupportsProjectionPushDown is indeed an elegant approach, but some of the cdc logic seems to have problems in the push-down phase, for example, here
If you run the MySqlConnectorITCase.testConsumingAllEvents case, the chunkkey will be adjusted here. For example, if my project column does not have a chunkkey, an error will be reported
https://github.com/apache/flink-cdc/blob/b437a49e67a2ef84c59cde09f2bfc372f45b31fa/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java#L267C1-L274C37
Object[] chunkKey =
RecordUtils.getSplitKey(
splitKeyType, statefulTaskContext.getSchemaNameAdjuster(), target);
for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
if (RecordUtils.splitKeyRangeContains(
chunkKey, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
&& position.isAfter(splitInfo.getHighWatermark())) {
return true;
@JNSimba Could we provide this feature by
SupportsProjectionPushDown?debezium.column.include.listanddebezium.column.exclude.listis hard for users to understand and use. We could useSupportsProjectionPushDowninterface to automatically generate the debezium setting. In this way we don't need to validate whether there is a conflict between the provided schema and the debezium setting.@ruanhang1993
SupportsProjectionPushDownis indeed an elegant approach, but some of the cdc logic seems to have problems in the push-down phase, for example, hereIf you run the
MySqlConnectorITCase.testConsumingAllEventscase, the chunkkey will be adjusted here. For example, if my project column does not have a chunkkey, an error will be reportedhttps://github.com/apache/flink-cdc/blob/b437a49e67a2ef84c59cde09f2bfc372f45b31fa/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java#L267C1-L274C37
Object[] chunkKey = RecordUtils.getSplitKey( splitKeyType, statefulTaskContext.getSchemaNameAdjuster(), target); for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) { if (RecordUtils.splitKeyRangeContains( chunkKey, splitInfo.getSplitStart(), splitInfo.getSplitEnd()) && position.isAfter(splitInfo.getHighWatermark())) { return true;
@JNSimba If the chunk key column is not contained in the projection, the snapshot phase will fail because of the lost of the chunk column. In this case, I think we need to contain these columns in snapshot phase and incremental phase. And apply the projection in the emitter.
@JNSimba Could we provide this feature by
SupportsProjectionPushDown?debezium.column.include.listanddebezium.column.exclude.listis hard for users to understand and use. We could useSupportsProjectionPushDowninterface to automatically generate the debezium setting. In this way we don't need to validate whether there is a conflict between the provided schema and the debezium setting.@ruanhang1993
SupportsProjectionPushDownis indeed an elegant approach, but some of the cdc logic seems to have problems in the push-down phase, for example, here If you run theMySqlConnectorITCase.testConsumingAllEventscase, the chunkkey will be adjusted here. For example, if my project column does not have a chunkkey, an error will be reported https://github.com/apache/flink-cdc/blob/b437a49e67a2ef84c59cde09f2bfc372f45b31fa/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java#L267C1-L274C37Object[] chunkKey = RecordUtils.getSplitKey( splitKeyType, statefulTaskContext.getSchemaNameAdjuster(), target); for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) { if (RecordUtils.splitKeyRangeContains( chunkKey, splitInfo.getSplitStart(), splitInfo.getSplitEnd()) && position.isAfter(splitInfo.getHighWatermark())) { return true;@JNSimba If the chunk key column is not contained in the projection, the snapshot phase will fail because of the lost of the chunk column. In this case, I think we need to contain these columns in snapshot phase and incremental phase. And apply the projection in the emitter.
@ruanhang1993 But it seems that in MySqlTableSource, chunkKeyColumn cannot be obtained because when the chunk key column is the primary key, chunkKeyColumn is null
@JNSimba In the backfill task, there may be one problem for projection. See RecordUtils.upsertBinlog in SnapshotSplitReader#pollSplitRecords.
For tables with PK, we use all PK columns to deduplicate the records. For tables with no PK, we use all columns to deduplicate the records.
So the limit for the projection is as follows. For tables with PK, we need to keep all PK columns. For tables with no PK, we need to keep all columns.
@JNSimba In the backfill task, there may be one problem for projection. See
RecordUtils.upsertBinloginSnapshotSplitReader#pollSplitRecords.For tables with PK, we use all PK columns to deduplicate the records. For tables with no PK, we use all columns to deduplicate the records.
So the limit for the projection is as follows. For tables with PK, we need to keep all PK columns. For tables with no PK, we need to keep all columns.
@ruanhang1993 Thanks for your suggestions. But I modified a version, but found some new problems:
Like this case: MySqlTimezoneITCase.testMySqlServerInBerlin
The id is the primary key, the inconsistency between the actual projected fields (date_c, time_c, datetime3_c, datetime6_c, timestamp_c , id) and the fields to be projected in the select (date_c, time_c, datetime3_c, datetime6_c, timestamp_c ) may cause some problems.
In non-incremental snapshot, RowDataSerializer.copy will force the judgment whether the type matches the data, so an error will be reported https://github.com/apache/flink/blob/release-1.20.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java#L122-L125
// from.getArity()=6 and types.length =5
if (from.getArity() != types.length) {
throw new IllegalArgumentException(
"Row arity: " + from.getArity() + ", but serializer arity: " + types.length);
}
For this case , it is normal when incrementalSnapshot=true, but it is wrong when false.
This pull request has been automatically marked as stale because it has not had recent activity for 120 days. It will be closed in 60 days if no further activity occurs.
This pull request has been closed because it has not had recent activity. You could reopen it if you try to continue your work, and anyone who are interested in it are encouraged to continue work on this pull request.