flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[FLINK-36813][cdc-connectors][mysql] support mysql sync part columns

Open JNSimba opened this issue 1 year ago • 12 comments

Background In some scenarios, MySQL synchronization only expects to synchronize specified fields instead of all fields in the table.

  1. The user only has the permission for some fields in MySQL
  2. The user has too many fields in a single table and only wants to synchronize some fields, for example, here https://github.com/apache/flink-cdc/discussions/3058

Current situation For the incremental stage, you only need to configure the column.include.list property of debezium to support the synchronization of some fields in the incremental stage, refer to: https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list

For the full snapshot stage, * is currently used in MySqlSnapshotSplitReadTask, refer to

if (isScanningData) {
return buildSelectWithRowLimits(
tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); 

Solution We can refer to debezium RelationalSnapshotChangeEventSource, The user configures column.include.list, and then captures the specific columns in MySqlSnapshotSplitReadTask, and splices them when constructing Scan SQL.

JNSimba avatar Nov 28 '24 03:11 JNSimba

@leonardBang @ruanhang1993 PTAL, Thanks

JNSimba avatar Dec 03 '24 02:12 JNSimba

@JNSimba Thanks for your PR.

I have some questions about the usage of column.include.list.

Consider that there are 3 tables db1.table1, db1.table2 and db1.table3 to read, whose schema are the same.

name type note
id varchar primary key
name varchar
age int
address varchar

For a table source as follows, will the change events contain all columns for db1.table2 and db1.table3 in incremental phase?

CREATE TABLE test_tbl (
    `id` STRING NOT NULL,
    name STRING,
    age INT,
    address STRING,
    primary key (`id`) not enforced
) WITH (
  'connector' = 'mysql-cdc',
  'database-name' = 'db1',
  'table-name' = 'table[0-9]+',
  'debezium.column.include.list' = 'db1.table1.id,db1.table1.name',
   ......
);
  1. How do we handle the debezium setting column.exclude.list? If the table source is as follows, what will the change data be?
CREATE TABLE test_tbl (
    `id` STRING NOT NULL,
    name STRING,
    age INT,
    address STRING,
    primary key (`id`) not enforced
) WITH (
  'connector' = 'mysql-cdc',
  'database-name' = 'db1',
  'table-name' = 'table[0-9]+',
  'debezium.column.include.list' = 'db1.table1.id,db1.table1.name',
  'debezium.column.exclude.list' = 'db1.table2.address,db1.table2.name',
   ......
);

ruanhang1993 avatar Mar 25 '25 07:03 ruanhang1993

Hi @JNSimba, could you please rebase this PR with latest master when available?

Code style verifier has been updated to enforce JUnit 5 + AssertJ framework and these classes might need to be migrated:

  • JUnit 4 style test annotations should be changed to JUnit 5 equivalents

    • org.junit.Test => org.junit.jupiter.api.Test
    • @Before, @BeforeClass => @BeforeEach, @BeforeAll
    • @After, @AfterClass => @AfterEach, @AfterAll
  • JUnit Assertions / Hamcrest Assertions are not allowed, including:

    • org.junit.Assert
    • org.junit.jupiter.api.Assertions
    • org.hamcrest.*

Use org.assertj.core.api.Assertions instead.

Running mvn verify -DskipTests locally could check if all these requirements have been satisfied.

yuxiqian avatar Mar 26 '25 09:03 yuxiqian

@JNSimba Could we provide this feature by SupportsProjectionPushDown? debezium.column.include.list and debezium.column.exclude.list is hard for users to understand and use. We could use SupportsProjectionPushDown interface to automatically generate the debezium setting. In this way we don't need to validate whether there is a conflict between the provided schema and the debezium setting.

ruanhang1993 avatar Mar 27 '25 05:03 ruanhang1993

@JNSimba Could we provide this feature by SupportsProjectionPushDown? debezium.column.include.list and debezium.column.exclude.list is hard for users to understand and use. We could use SupportsProjectionPushDown interface to automatically generate the debezium setting. In this way we don't need to validate whether there is a conflict between the provided schema and the debezium setting.

Thanks @ruanhang1993 , This is a good idea, I will modify based on SupportsProjectionPushDown

JNSimba avatar Apr 01 '25 10:04 JNSimba

@JNSimba Could we provide this feature by SupportsProjectionPushDown? debezium.column.include.list and debezium.column.exclude.list is hard for users to understand and use. We could use SupportsProjectionPushDown interface to automatically generate the debezium setting. In this way we don't need to validate whether there is a conflict between the provided schema and the debezium setting.

@ruanhang1993 SupportsProjectionPushDown is indeed an elegant approach, but some of the cdc logic seems to have problems in the push-down phase, for example, here

If you run the MySqlConnectorITCase.testConsumingAllEvents case, the chunkkey will be adjusted here. For example, if my project column does not have a chunkkey, an error will be reported

https://github.com/apache/flink-cdc/blob/b437a49e67a2ef84c59cde09f2bfc372f45b31fa/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java#L267C1-L274C37

                Object[] chunkKey =
                        RecordUtils.getSplitKey(
                                splitKeyType, statefulTaskContext.getSchemaNameAdjuster(), target);
                for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
                    if (RecordUtils.splitKeyRangeContains(
                                    chunkKey, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
                            && position.isAfter(splitInfo.getHighWatermark())) {
                        return true;

JNSimba avatar Apr 07 '25 14:04 JNSimba

@JNSimba Could we provide this feature by SupportsProjectionPushDown? debezium.column.include.list and debezium.column.exclude.list is hard for users to understand and use. We could use SupportsProjectionPushDown interface to automatically generate the debezium setting. In this way we don't need to validate whether there is a conflict between the provided schema and the debezium setting.

@ruanhang1993 SupportsProjectionPushDown is indeed an elegant approach, but some of the cdc logic seems to have problems in the push-down phase, for example, here

If you run the MySqlConnectorITCase.testConsumingAllEvents case, the chunkkey will be adjusted here. For example, if my project column does not have a chunkkey, an error will be reported

https://github.com/apache/flink-cdc/blob/b437a49e67a2ef84c59cde09f2bfc372f45b31fa/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java#L267C1-L274C37

                Object[] chunkKey =
                        RecordUtils.getSplitKey(
                                splitKeyType, statefulTaskContext.getSchemaNameAdjuster(), target);
                for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
                    if (RecordUtils.splitKeyRangeContains(
                                    chunkKey, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
                            && position.isAfter(splitInfo.getHighWatermark())) {
                        return true;

@JNSimba If the chunk key column is not contained in the projection, the snapshot phase will fail because of the lost of the chunk column. In this case, I think we need to contain these columns in snapshot phase and incremental phase. And apply the projection in the emitter.

ruanhang1993 avatar Apr 08 '25 07:04 ruanhang1993

@JNSimba Could we provide this feature by SupportsProjectionPushDown? debezium.column.include.list and debezium.column.exclude.list is hard for users to understand and use. We could use SupportsProjectionPushDown interface to automatically generate the debezium setting. In this way we don't need to validate whether there is a conflict between the provided schema and the debezium setting.

@ruanhang1993 SupportsProjectionPushDown is indeed an elegant approach, but some of the cdc logic seems to have problems in the push-down phase, for example, here If you run the MySqlConnectorITCase.testConsumingAllEvents case, the chunkkey will be adjusted here. For example, if my project column does not have a chunkkey, an error will be reported https://github.com/apache/flink-cdc/blob/b437a49e67a2ef84c59cde09f2bfc372f45b31fa/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java#L267C1-L274C37

                Object[] chunkKey =
                        RecordUtils.getSplitKey(
                                splitKeyType, statefulTaskContext.getSchemaNameAdjuster(), target);
                for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
                    if (RecordUtils.splitKeyRangeContains(
                                    chunkKey, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
                            && position.isAfter(splitInfo.getHighWatermark())) {
                        return true;

@JNSimba If the chunk key column is not contained in the projection, the snapshot phase will fail because of the lost of the chunk column. In this case, I think we need to contain these columns in snapshot phase and incremental phase. And apply the projection in the emitter.

@ruanhang1993 But it seems that in MySqlTableSource, chunkKeyColumn cannot be obtained because when the chunk key column is the primary key, chunkKeyColumn is null

JNSimba avatar Apr 08 '25 15:04 JNSimba

@JNSimba In the backfill task, there may be one problem for projection. See RecordUtils.upsertBinlog in SnapshotSplitReader#pollSplitRecords.

For tables with PK, we use all PK columns to deduplicate the records. For tables with no PK, we use all columns to deduplicate the records.

So the limit for the projection is as follows. For tables with PK, we need to keep all PK columns. For tables with no PK, we need to keep all columns.

ruanhang1993 avatar Apr 10 '25 06:04 ruanhang1993

@JNSimba In the backfill task, there may be one problem for projection. See RecordUtils.upsertBinlog in SnapshotSplitReader#pollSplitRecords.

For tables with PK, we use all PK columns to deduplicate the records. For tables with no PK, we use all columns to deduplicate the records.

So the limit for the projection is as follows. For tables with PK, we need to keep all PK columns. For tables with no PK, we need to keep all columns.

@ruanhang1993 Thanks for your suggestions. But I modified a version, but found some new problems:

Like this case: MySqlTimezoneITCase.testMySqlServerInBerlin

The id is the primary key, the inconsistency between the actual projected fields (date_c, time_c, datetime3_c, datetime6_c, timestamp_c , id) and the fields to be projected in the select (date_c, time_c, datetime3_c, datetime6_c, timestamp_c ) may cause some problems.

In non-incremental snapshot, RowDataSerializer.copy will force the judgment whether the type matches the data, so an error will be reported https://github.com/apache/flink/blob/release-1.20.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java#L122-L125

// from.getArity()=6 and   types.length =5
if (from.getArity() != types.length) {
throw new IllegalArgumentException(
"Row arity: " + from.getArity() + ", but serializer arity: " + types.length);
}

For this case , it is normal when incrementalSnapshot=true, but it is wrong when false.

JNSimba avatar Apr 11 '25 02:04 JNSimba

This pull request has been automatically marked as stale because it has not had recent activity for 120 days. It will be closed in 60 days if no further activity occurs.

github-actions[bot] avatar Aug 10 '25 00:08 github-actions[bot]

This pull request has been closed because it has not had recent activity. You could reopen it if you try to continue your work, and anyone who are interested in it are encouraged to continue work on this pull request.

github-actions[bot] avatar Oct 11 '25 00:10 github-actions[bot]