flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[MySQL CDC Connector] A piece of data appear in both full data and binlog data.

Open MaoHJ-Official opened this issue 2 years ago • 3 comments

Describe the bug(Please use English) When full data is being read and a piece of data is inserted, this data will appear in both full data and binlog data.

Environment :

  • Flink version : 1.14.4
  • Flink CDC version: 2.2.0
  • Database and version: MySQL 5.6.51

To Reproduce Steps to reproduce the behavior:

  1. metadata of table in MySQL
mysql> show create table test_0718;
+-----------+----------------------------------------------------------------------------------------------------------------------------------------------------+
| Table     | Create Table                                                                                                                                       |
+-----------+----------------------------------------------------------------------------------------------------------------------------------------------------+
| test_0718 | CREATE TABLE `test_0718` (
  `id` int(11) NOT NULL,
  `col1` varchar(30) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 |
+-----------+----------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
  1. Constantly insert into MySQL
final String host = "192.168.4.234";
final int port = 3306;
final String userName = "root";
final String password = "mysql";
final String database = "TPCC";
final String url = String.format("jdbc:mysql://%s:%s/%s", host, port, database);
final String table = "test_0718";
Connection connection = DriverManager.getConnection(url, userName, password);
PreparedStatement preparedStatement = connection.prepareStatement("insert into test_0718 values (?,7)");
for (int i = 1; i < Integer.MAX_VALUE; i++) {
    preparedStatement.setInt(1, i);
    preparedStatement.execute();
    if (i % 1000 == 0) {
        System.out.println(String.format("insert %s000", i / 1000));
    }
}
  1. When about 8000 rows are observed to be inserted, start the connector. I implements deserialize method in DebeziumDeserializationSchema<T>.
  @Override
  public void deserialize(SourceRecord sourceRecord, Collector<ChangeEvent> collector)
      throws SyncException {
    if (!(sourceRecord.value() instanceof Struct)) {
      LOG.error("sourceRecord.value() is not a Struct: {}", sourceRecord);
      throw new IllegalArgumentException("sourceRecord.value() is not a Struct: " + sourceRecord);
    }
    Struct valueStruct = (Struct) sourceRecord.value();

    Struct sourceStruct = valueStruct.getStruct(SOURCE);

    String catalogName = deserializeCatalogName(sourceStruct);

    String schemaName = deserializeSchemaName(sourceStruct);

    String tableName = deserializeTableName(sourceStruct);

    LogPosition logPosition = deserializeLogPosition(sourceStruct);
    TransactionInfo transactionInfo = deserializeTransactionInfo(sourceStruct);

    Struct beforeStruct = valueStruct.getStruct(BEFORE);
    Struct afterStruct = valueStruct.getStruct(AFTER);

    Optional<List<Object>> beforeOptionalList = getOptionalList(beforeStruct);
    Optional<List<Object>> afterOptionalList = getOptionalList(afterStruct);

    Date changedAt = new Date(sourceStruct.getInt64(TS_MS));

    if (LOG.isDebugEnabled()) {
      LOG.debug(
          "[flinkcdc source] CRUD:\ndatabase.schema.table = {}.{}.{}\nbefore = {}\nafter = {}",
          catalogName,
          schemaName,
          tableName,
          beforeOptionalList,
          afterOptionalList);
    }

    switch (valueStruct.getString(OP)) {
      case "c":
        // insert
        if (LOG.isDebugEnabled()) {
          LOG.debug("[flinkcdc source] insert");
        }
        collector.collect(
            new DataChangeEvent(transactionInfo, logPosition, DmlOperation.INSERT, changedAt, catalogName, schemaName, tableName, beforeOptionalList, afterOptionalList, false));
        break;
      case "u":
        // update
        if (LOG.isDebugEnabled()) {
          LOG.debug("[flinkcdc source] update");
        }
        collector.collect(
            new DataChangeEvent(transactionInfo, logPosition, DmlOperation.UPDATE, changedAt, catalogName, schemaName, tableName, beforeOptionalList, afterOptionalList, false));
        break;
      case "r":
        // read / snapshot
        if (LOG.isDebugEnabled()) {
          LOG.debug("[flinkcdc source] read / snapshot");
        }
        collector.collect(
            new DataChangeEvent(transactionInfo, logPosition, DmlOperation.INSERT, changedAt, catalogName, schemaName, tableName, beforeOptionalList, afterOptionalList, true));
        break;
      case "d":
        // delete
        if (LOG.isDebugEnabled()) {
          LOG.debug("[flinkcdc source] delete");
        }
        collector.collect(
            new DataChangeEvent(transactionInfo, logPosition, DmlOperation.DELETE, changedAt, catalogName, schemaName, tableName, beforeOptionalList, afterOptionalList, false));
        break;
      default:
        // never reach
        throw new UnsupportedDmlException("unsupported op: " + valueStruct.getString(OP));
    }
  }
  1. A piece of data appear in both full data and binlog data

Additional Description full data binlog data

MaoHJ-Official avatar Jul 27 '22 08:07 MaoHJ-Official

same problem

JesseAtSZ avatar Jul 28 '22 13:07 JesseAtSZ

I am quite sure Incremental snapshot caused this bug. I reduced the version of MySQL cdc connector to 1.4.0, which Incremental snapshot has not been implemented. The connector works well.

MaoHJ-Official avatar Aug 01 '22 02:08 MaoHJ-Official

Could you please provide the parameters for the mysql cdc source that you used? @MaoHJ-Official I don't replay the problem in my own test. Maybe we use the different settings.

Thanks~

ruanhang1993 avatar Aug 17 '22 06:08 ruanhang1993

'scan.incremental.snapshot.enabled'='true' In the stage of snapshot synchronization, insert some new data into table being synchronized and it can be reproduced.

@ruanhang1993

leesenlen avatar Jun 08 '23 04:06 leesenlen

Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink with component tag Flink CDC. Thank you!

PatrickRen avatar Feb 28 '24 15:02 PatrickRen