flink-cdc
flink-cdc copied to clipboard
Flink Tidb CDC records loss
Describe the bug(Please use English) We are trying to use Flink Tidb CDC to synchronize data from tidb to tidb, since we have data encryption issues we chose to use datastream api. It works well for tables which have 500k- records, however, for 500k+ record-level tables, the number of records for our source tables and target tables won't match up.
Environment :
- Flink version : 1.13.2
- Flink CDC version: 2.2.0
- Database and version: tidb version v5.4
To Reproduce We initialized RowDataTiKVSnapshotEventDeserializationSchema and RowDataTiKVSChangeEventDeserializationSchema with table schema fetched from pd to get record in RowData type for example RowDataTiKVChangeEventDeserializationSchema changeEventDeserializationSchema = new RowDataTiKVChangeEventDeserializationSchema( tiConfig, SourceDbName, SourceTableName, typeInfo, metadataConverters, physicalDataType); and finally we convert the stream to Table type and use Flink Table api to sink to tidb Table table = tableEnv.fromChangelogStream( dataStream, Schema.newBuilder().fromSchema(schemaWithPrimaryKey).build(), ChangelogMode.upsert());
Additional Description source table: 90000000 records sinked to target table: 2000000 the Flink task runs without any exceptions, and also we can't find any exceptions in logs. I believe the issue occurs when synchronizing table snapshots. Thanks for the help!
metoo
Me too Did the owner solve it?
Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink
with component tag Flink CDC
. Thank you!