tis icon indicating copy to clipboard operation
tis copied to clipboard

基于TIS实现kingbasev8r6 cdc实时同步,无法成功

Open CherryRum opened this issue 5 months ago • 10 comments

版本拉的4.2.0

Image

kingase 版本 KingbaseES V008R006C008B0014 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-28), 64-bit docker-compose如下

services:
  kingbase:
    container_name: kingbasev8r6
    image: huas/kingbase_v008r006c008b0014_single_x86:v1-gis
    privileged: true
    volumes:
      - ./data:/home/kingbase/userdata/data
      - ./etc:/home/kingbase/install/kingbase/etc
    command: /usr/sbin/init
    environment:
      - DB_USER=xxxxx
      - PASSWORD=12345678
      - DB_MODE=mysql
      - ENABLE_CI=no
      - TZ=Asia/Shanghai
      - LANG=en_US.UTF-8
      - LC_ALL=en_US.UTF-8
    ports:
      - "4321:54321"
    restart: unless-stopped

数据库兼容类型 mysql

1.跑datax全量同步没有问题

Image

2.建立实时同步管道,执行就报错,数据类型主键我尝试了 int8 int4 都不行,表结构如下

Image

flink日志

2025-06-14 23:25:31
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:126)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:328)
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:642)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
	at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
	at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
	at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
	at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
	at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
	at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
	at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
	at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
	at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: 192.168.31.141:4321_test_db -> Process -> test_001_dto2Rowdata -> skipUpdateBeforeEventOrSpecEvent -> Sink: test_001' (operator cbc357ccb763df2852fee8c4fc7d55f2).
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:624)
	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:248)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:395)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:483)
	at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkRuntimeException: Generate Splits for table public.test_001 error
	at org.apache.flink.cdc.connectors.postgres.source.PostgresChunkSplitter.generateSplits(PostgresChunkSplitter.java:125)
	at org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.getNext(SnapshotSplitAssigner.java:284)
	at org.apache.flink.cdc.connectors.base.source.assigner.HybridSplitAssigner.getNext(HybridSplitAssigner.java:156)
	at org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:217)
	at org.apache.flink.cdc.connectors.postgres.source.enumerator.PostgresSourceEnumerator.assignSplits(PostgresSourceEnumerator.java:81)
	at org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.handleSplitRequest(IncrementalSourceEnumerator.java:115)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleRequestSplitEvent(SourceCoordinator.java:568)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$3(SourceCoordinator.java:295)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:469)
	... 7 more
Caused by: java.lang.UnsupportedOperationException: Doesn't support Postgres type 'bigint' yet
	at org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils.convertFromColumn(PostgresTypeUtils.java:160)
	at org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils.fromDbzColumn(PostgresTypeUtils.java:66)
	at org.apache.flink.cdc.connectors.postgres.source.PostgresChunkSplitter.fromDbzColumn(PostgresChunkSplitter.java:172)
	at org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.isEvenlySplitColumn(JdbcSourceChunkSplitter.java:119)
	at org.apache.flink.cdc.connectors.postgres.source.PostgresChunkSplitter.splitTableIntoChunks(PostgresChunkSplitter.java:195)
	at org.apache.flink.cdc.connectors.postgres.source.PostgresChunkSplitter.generateSplits(PostgresChunkSplitter.java:95)

CherryRum avatar Jun 14 '25 15:06 CherryRum

int4 主键报错

2025-06-14 23:18:19
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:126)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:328)
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:642)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
	at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
	at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
	at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
	at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
	at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
	at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
	at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
	at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
	at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: 192.168.31.141:4321_test_db -> Process -> test_001_dto2Rowdata -> skipUpdateBeforeEventOrSpecEvent -> Sink: test_001' (operator cbc357ccb763df2852fee8c4fc7d55f2).
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:624)
	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:248)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:395)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:483)
	at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkRuntimeException: Generate Splits for table public.test_001 error
	at org.apache.flink.cdc.connectors.postgres.source.PostgresChunkSplitter.generateSplits(PostgresChunkSplitter.java:125)
	at org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.getNext(SnapshotSplitAssigner.java:284)
	at org.apache.flink.cdc.connectors.base.source.assigner.HybridSplitAssigner.getNext(HybridSplitAssigner.java:156)
	at org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:217)
	at org.apache.flink.cdc.connectors.postgres.source.enumerator.PostgresSourceEnumerator.assignSplits(PostgresSourceEnumerator.java:81)
	at org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.handleSplitRequest(IncrementalSourceEnumerator.java:115)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleRequestSplitEvent(SourceCoordinator.java:568)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$3(SourceCoordinator.java:295)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:469)
	... 7 more
Caused by: java.lang.UnsupportedOperationException: Doesn't support Postgres type 'int' yet
	at org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils.convertFromColumn(PostgresTypeUtils.java:160)
	at org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils.fromDbzColumn(PostgresTypeUtils.java:66)
	at org.apache.flink.cdc.connectors.postgres.source.PostgresChunkSplitter.fromDbzColumn(PostgresChunkSplitter.java:172)
	at org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.isEvenlySplitColumn(JdbcSourceChunkSplitter.java:119)
	at org.apache.flink.cdc.connectors.postgres.source.PostgresChunkSplitter.splitTableIntoChunks(PostgresChunkSplitter.java:195)
	at org.apache.flink.cdc.connectors.postgres.source.PostgresChunkSplitter.generateSplits(PostgresChunkSplitter.java:95)
	... 15 more

CherryRum avatar Jun 14 '25 15:06 CherryRum

能否提供提供:

  1. 建表的ddl,
  2. 该表的insert语句,有代表性的一条即可

另外,我测试是使用v9r1 版本,您是否测试过该版本?,docker启动命令如下:

docker run -tid --privileged -p 4321:54321 -v /root/kingbase/data_mysql:/home/kingbase/userdata/ -e ENABLE_CI=yes -e NEED_START=yes -e DB_USER=kingbase -e DB_PASSWORD=123456 -e DB_MODE=mysql --name kingbase_mysql kingbase_v009r001c002b0014_single_x86:v1

baisui1981 avatar Jun 15 '25 03:06 baisui1981

晚上给您提供,主要是现在这边他们主推的还是v8r6之类的版本(v9的版本甲方这边采购感觉不稳定),导致我这边测试都测8.6 建表的ddl我晚上给您导出来,其实很简单,就是主键 int4 或者 int8

CherryRum avatar Jun 15 '25 05:06 CherryRum

晚上给您提供,主要是现在这边他们主推的还是v8r6之类的版本(v9的版本甲方这边采购感觉不稳定),导致我这边测试都测8.6 建表的ddl我晚上给您导出来,其实很简单,就是主键 int4 或者 int8

ok,等你提供了sql 脚本之后我本地搭建一个v9的环境测试一下

baisui1981 avatar Jun 15 '25 05:06 baisui1981

好的麻烦您了,据我这些天的研究,好像kingbase不同小版本都有小小差异,9版本更细化了

/*
 Navicat Premium Dump SQL

 Source Server         : nas-kingbase
 Source Server Type    : PostgreSQL
 Source Server Version : 120001 (120001)
 Source Host           : mynas.oldletter.cn:4321
 Source Catalog        : test_db
 Source Schema         : public

 Target Server Type    : PostgreSQL
 Target Server Version : 120001 (120001)
 File Encoding         : 65001

 Date: 15/06/2025 19:09:03
*/


-- ----------------------------
-- Table structure for test_001
-- ----------------------------
DROP TABLE IF EXISTS "public"."test_001";
CREATE TABLE "public"."test_001" (
  "id" int8 NOT NULL,
  "name" varchar(250) COLLATE "pg_catalog"."default",
  "age" int2,
  "datat" date,
  "email" varchar(255) COLLATE "pg_catalog"."default"
)
;

-- ----------------------------
-- Records of test_001
-- ----------------------------
INSERT INTO "public"."test_001" VALUES (1, '戴宇宁', 224, '2012-12-23', '[email protected]');
INSERT INTO "public"."test_001" VALUES (2, '罗岚', 422, '2016-11-26', '[email protected]');
INSERT INTO "public"."test_001" VALUES (3, '蔡岚', 889, '2014-10-20', '[email protected]');
INSERT INTO "public"."test_001" VALUES (4, '崔云熙', 171, '2014-06-29', '[email protected]');
INSERT INTO "public"."test_001" VALUES (5, '胡璐', 402, '2022-02-23', '[email protected]');
INSERT INTO "public"."test_001" VALUES (6, '方岚', 805, '2009-12-27', '[email protected]');
INSERT INTO "public"."test_001" VALUES (7, '曹詩涵', 581, '2013-05-22', '[email protected]');
INSERT INTO "public"."test_001" VALUES (8, '董詩涵', 377, '2012-12-23', '[email protected]');
INSERT INTO "public"."test_001" VALUES (9, '董睿', 409, '2010-01-25', '[email protected]');
INSERT INTO "public"."test_001" VALUES (10, '董子异', 25, '2001-10-16', '[email protected]');

-- ----------------------------
-- Primary Key structure for table test_001
-- ----------------------------
ALTER TABLE "public"."test_001" ADD CONSTRAINT "test_001_pkey" PRIMARY KEY ("id");

CherryRum avatar Jun 15 '25 11:06 CherryRum

以上提供的SQL 你确定是 MySQL 版本的?因为依据你提供的docker启动命令中的参数DB_MODE=mysql ,应该是MySQL 模式,但是你提供的DDL 和Insert 脚本应该PG模式的吧?

baisui1981 avatar Jun 15 '25 14:06 baisui1981

我只是用了 navicat链接的导出的,毕竟kingbase自己的那个链接工具太麻烦了

Image

CherryRum avatar Jun 15 '25 14:06 CherryRum

那你使用的数据通道是 从postgresql 同步到 模式为mysql kingbase 数据吗?你提供的 DDL 是 源库postgresql 的?

baisui1981 avatar Jun 16 '25 04:06 baisui1981

不我是同一个数据库链接,不同库

数据通道是,kingbase v8r6 mysql模式的test_db.public.test_001 -> test.public.test_001

上面的原因是因为,我最近这个电脑没装kingbase的链接工具,就用navicat的pg连进去,导出的ddl,

也可以加qq联系我,1809040656

CherryRum avatar Jun 16 '25 04:06 CherryRum

1809040656 我没有qq ,加我微信:16660356

baisui1981 avatar Jun 16 '25 06:06 baisui1981