flink-cdc
flink-cdc copied to clipboard
[MySQL CDC Connector] A piece of data appear in both full data and binlog data.
Describe the bug(Please use English) When full data is being read and a piece of data is inserted, this data will appear in both full data and binlog data.
Environment :
- Flink version : 1.14.4
- Flink CDC version: 2.2.0
- Database and version: MySQL 5.6.51
To Reproduce Steps to reproduce the behavior:
- metadata of table in MySQL
mysql> show create table test_0718;
+-----------+----------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+-----------+----------------------------------------------------------------------------------------------------------------------------------------------------+
| test_0718 | CREATE TABLE `test_0718` (
`id` int(11) NOT NULL,
`col1` varchar(30) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 |
+-----------+----------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
- Constantly insert into MySQL
final String host = "192.168.4.234";
final int port = 3306;
final String userName = "root";
final String password = "mysql";
final String database = "TPCC";
final String url = String.format("jdbc:mysql://%s:%s/%s", host, port, database);
final String table = "test_0718";
Connection connection = DriverManager.getConnection(url, userName, password);
PreparedStatement preparedStatement = connection.prepareStatement("insert into test_0718 values (?,7)");
for (int i = 1; i < Integer.MAX_VALUE; i++) {
preparedStatement.setInt(1, i);
preparedStatement.execute();
if (i % 1000 == 0) {
System.out.println(String.format("insert %s000", i / 1000));
}
}
- When about 8000 rows are observed to be inserted, start the connector. I implements deserialize method in DebeziumDeserializationSchema<T>.
@Override
public void deserialize(SourceRecord sourceRecord, Collector<ChangeEvent> collector)
throws SyncException {
if (!(sourceRecord.value() instanceof Struct)) {
LOG.error("sourceRecord.value() is not a Struct: {}", sourceRecord);
throw new IllegalArgumentException("sourceRecord.value() is not a Struct: " + sourceRecord);
}
Struct valueStruct = (Struct) sourceRecord.value();
Struct sourceStruct = valueStruct.getStruct(SOURCE);
String catalogName = deserializeCatalogName(sourceStruct);
String schemaName = deserializeSchemaName(sourceStruct);
String tableName = deserializeTableName(sourceStruct);
LogPosition logPosition = deserializeLogPosition(sourceStruct);
TransactionInfo transactionInfo = deserializeTransactionInfo(sourceStruct);
Struct beforeStruct = valueStruct.getStruct(BEFORE);
Struct afterStruct = valueStruct.getStruct(AFTER);
Optional<List<Object>> beforeOptionalList = getOptionalList(beforeStruct);
Optional<List<Object>> afterOptionalList = getOptionalList(afterStruct);
Date changedAt = new Date(sourceStruct.getInt64(TS_MS));
if (LOG.isDebugEnabled()) {
LOG.debug(
"[flinkcdc source] CRUD:\ndatabase.schema.table = {}.{}.{}\nbefore = {}\nafter = {}",
catalogName,
schemaName,
tableName,
beforeOptionalList,
afterOptionalList);
}
switch (valueStruct.getString(OP)) {
case "c":
// insert
if (LOG.isDebugEnabled()) {
LOG.debug("[flinkcdc source] insert");
}
collector.collect(
new DataChangeEvent(transactionInfo, logPosition, DmlOperation.INSERT, changedAt, catalogName, schemaName, tableName, beforeOptionalList, afterOptionalList, false));
break;
case "u":
// update
if (LOG.isDebugEnabled()) {
LOG.debug("[flinkcdc source] update");
}
collector.collect(
new DataChangeEvent(transactionInfo, logPosition, DmlOperation.UPDATE, changedAt, catalogName, schemaName, tableName, beforeOptionalList, afterOptionalList, false));
break;
case "r":
// read / snapshot
if (LOG.isDebugEnabled()) {
LOG.debug("[flinkcdc source] read / snapshot");
}
collector.collect(
new DataChangeEvent(transactionInfo, logPosition, DmlOperation.INSERT, changedAt, catalogName, schemaName, tableName, beforeOptionalList, afterOptionalList, true));
break;
case "d":
// delete
if (LOG.isDebugEnabled()) {
LOG.debug("[flinkcdc source] delete");
}
collector.collect(
new DataChangeEvent(transactionInfo, logPosition, DmlOperation.DELETE, changedAt, catalogName, schemaName, tableName, beforeOptionalList, afterOptionalList, false));
break;
default:
// never reach
throw new UnsupportedDmlException("unsupported op: " + valueStruct.getString(OP));
}
}
- A piece of data appear in both full data and binlog data
Additional Description
same problem
I am quite sure Incremental snapshot caused this bug. I reduced the version of MySQL cdc connector to 1.4.0, which Incremental snapshot has not been implemented. The connector works well.
Could you please provide the parameters for the mysql cdc source that you used? @MaoHJ-Official I don't replay the problem in my own test. Maybe we use the different settings.
Thanks~
'scan.incremental.snapshot.enabled'='true' In the stage of snapshot synchronization, insert some new data into table being synchronized and it can be reproduced.
@ruanhang1993
Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink
with component tag Flink CDC
. Thank you!