seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [Module Name] TiDB-CDC cdc event lose data

Open CapitalMr opened this issue 10 months ago • 6 comments

Search before asking

  • [x] I had searched in the issues and found no similar issues.

What happened

Data is lost during the cdc phase,not all table is missing, and a few table will appear。

regionId: 12819225739 startKey: t_656992_5f72800000000000eadd endKey: t_657156_5f698000000000000001038000000008f76ec203814512fa84f0092c038000000000370de0

SeaTunnel Version

2.3.8

SeaTunnel Config

{
    "env": {
        "parallelism": 1,
        "job.mode": "STREAMING",
        "job.name": "tidb=>Kafka : cxxx",
        "checkpoint.interval": "60000",
        "checkpoint.timeout": "6000",
        "flush.timeout.ms": "6000"
    },
    "source" : [
        {
            "plugin_name": "TiDB-CDC-MIGRATE",
            "pd-addresses": "xxx:2379",
            "driver": "com.mysql.cj.jdbc.Driver",
            "base-url" : "jdbc:mysql://xxx?useSSL=false",
            "database-name" : "xxx",
            "username": "xxx",
            "password": "xxx",
            "table-name": "xxx",
            "tikv.grpc.timeout_in_ms": "260000"
        }
    ],
    "transform" : [
        
    ],
    "sink" : [
        {
            "plugin_name" : "Kafka",
            "bootstrap.servers" : "xxx:xx",
            "topic": "short_video_log_ext.epis_watch_log",
	"kafka.config": {
		"acks": "all",
                "retries": 128,
                "retry.backoff.ms": 500,
		"request.timeout.ms": 240000,
                "batch.size": 10240,
                "buffer.memory": 67108864,
                "send.buffer.bytes": 262144,
                "compression.gzip.level": 6,
                "compression.type": "gzip"
            },
            "semantics" : "AT_LEAST_ONCE",
            "format": "debezium_json"
        }
    ]
}

Running Command

web http commit

Error Exception

no error message

Zeta or Flink or Spark Version

zeta

Java or Scala Version

jdk21

Screenshots

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

CapitalMr avatar Feb 24 '25 12:02 CapitalMr

cc @sunxiaojian

hailin0 avatar Feb 25 '25 02:02 hailin0

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] avatar Mar 28 '25 00:03 github-actions[bot]

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] avatar Jul 12 '25 00:07 github-actions[bot]

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.

github-actions[bot] avatar Jul 20 '25 00:07 github-actions[bot]

Is there anybody who'd like to do this issue? please leave a message and then you can submit a PR within 4 weeks.

Here are some helpful resources to get started:

Contribution Setup Guide Code Submission Guide

davidzollo avatar Nov 08 '25 11:11 davidzollo

我也遇到相同的问题了,是在删除数据的时候遇到的,我在TiDB弄了一张最简单的表,如下: CREATE TABLE IF NOT EXISTS abcd ( fld_guid char(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL comment '主键', PRIMARY KEY(fld_guid) ) comment '批量事件-导入模板-欠费';

然后插入两条数据: INSERT INTO abcd (fld_guid) VALUES('0000137c-2e9a-4ed3-92e7-d9cc7e0267f2'); INSERT INTO abcd (fld_guid) VALUES('0000204f-6b6f-44b3-bbe6-97c095cbbb01');

步骤: 1,启动seatunnel,两条数据正常同步到Doris数据库 2,在TiDB执行 delete from abcd; 删除数据 3,在Doris只有一条数据被删除,实际应该两条都删除

结论:部分ID在目标数据源会遇到删除不了的问题,大部分ID是可以的

seatunnel的配置: env { parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 2000 }

source { TiDB-CDC { plugin_output = "abcd" url = "jdbc:mysql://192.168.1.7:4000/local1" driver = "com.mysql.cj.jdbc.Driver" tikv.grpc.timeout_in_ms = 20000 pd-addresses = "192.168.1.5:2379" username = "xxxx" password = "xxxxx" database-name = "test" table-name = "abcd" batch-size-per-scan = "5000" } }

sink { Doris { plugin_input = "abcd" fenodes = "192.168.1.7:8030" username = "xxxx" password = "xxxx" database = "test" sink.enable-2pc = "true" sink.enable-delete = "true" sink.label-prefix = "abcd_prefix" doris.config = { line_delimiter = "&&@&&" format="json" read_json_by_line="true" } } }

yangjie397 avatar Dec 09 '25 09:12 yangjie397