tis
tis copied to clipboard
基于TIS实现kingbasev8r6 cdc实时同步,无法成功
版本拉的4.2.0
kingase 版本
KingbaseES V008R006C008B0014 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-28), 64-bit
docker-compose如下
services:
kingbase:
container_name: kingbasev8r6
image: huas/kingbase_v008r006c008b0014_single_x86:v1-gis
privileged: true
volumes:
- ./data:/home/kingbase/userdata/data
- ./etc:/home/kingbase/install/kingbase/etc
command: /usr/sbin/init
environment:
- DB_USER=xxxxx
- PASSWORD=12345678
- DB_MODE=mysql
- ENABLE_CI=no
- TZ=Asia/Shanghai
- LANG=en_US.UTF-8
- LC_ALL=en_US.UTF-8
ports:
- "4321:54321"
restart: unless-stopped
数据库兼容类型
mysql
1.跑datax全量同步没有问题
2.建立实时同步管道,执行就报错,数据类型主键我尝试了 int8 int4 都不行,表结构如下
flink日志
2025-06-14 23:25:31
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:126)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:328)
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:642)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: 192.168.31.141:4321_test_db -> Process -> test_001_dto2Rowdata -> skipUpdateBeforeEventOrSpecEvent -> Sink: test_001' (operator cbc357ccb763df2852fee8c4fc7d55f2).
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:624)
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:248)
at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:395)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:483)
at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkRuntimeException: Generate Splits for table public.test_001 error
at org.apache.flink.cdc.connectors.postgres.source.PostgresChunkSplitter.generateSplits(PostgresChunkSplitter.java:125)
at org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.getNext(SnapshotSplitAssigner.java:284)
at org.apache.flink.cdc.connectors.base.source.assigner.HybridSplitAssigner.getNext(HybridSplitAssigner.java:156)
at org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:217)
at org.apache.flink.cdc.connectors.postgres.source.enumerator.PostgresSourceEnumerator.assignSplits(PostgresSourceEnumerator.java:81)
at org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.handleSplitRequest(IncrementalSourceEnumerator.java:115)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleRequestSplitEvent(SourceCoordinator.java:568)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$3(SourceCoordinator.java:295)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:469)
... 7 more
Caused by: java.lang.UnsupportedOperationException: Doesn't support Postgres type 'bigint' yet
at org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils.convertFromColumn(PostgresTypeUtils.java:160)
at org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils.fromDbzColumn(PostgresTypeUtils.java:66)
at org.apache.flink.cdc.connectors.postgres.source.PostgresChunkSplitter.fromDbzColumn(PostgresChunkSplitter.java:172)
at org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.isEvenlySplitColumn(JdbcSourceChunkSplitter.java:119)
at org.apache.flink.cdc.connectors.postgres.source.PostgresChunkSplitter.splitTableIntoChunks(PostgresChunkSplitter.java:195)
at org.apache.flink.cdc.connectors.postgres.source.PostgresChunkSplitter.generateSplits(PostgresChunkSplitter.java:95)
int4 主键报错
2025-06-14 23:18:19
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:126)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:328)
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:642)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: 192.168.31.141:4321_test_db -> Process -> test_001_dto2Rowdata -> skipUpdateBeforeEventOrSpecEvent -> Sink: test_001' (operator cbc357ccb763df2852fee8c4fc7d55f2).
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:624)
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:248)
at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:395)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:483)
at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkRuntimeException: Generate Splits for table public.test_001 error
at org.apache.flink.cdc.connectors.postgres.source.PostgresChunkSplitter.generateSplits(PostgresChunkSplitter.java:125)
at org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.getNext(SnapshotSplitAssigner.java:284)
at org.apache.flink.cdc.connectors.base.source.assigner.HybridSplitAssigner.getNext(HybridSplitAssigner.java:156)
at org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:217)
at org.apache.flink.cdc.connectors.postgres.source.enumerator.PostgresSourceEnumerator.assignSplits(PostgresSourceEnumerator.java:81)
at org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.handleSplitRequest(IncrementalSourceEnumerator.java:115)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleRequestSplitEvent(SourceCoordinator.java:568)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$3(SourceCoordinator.java:295)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:469)
... 7 more
Caused by: java.lang.UnsupportedOperationException: Doesn't support Postgres type 'int' yet
at org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils.convertFromColumn(PostgresTypeUtils.java:160)
at org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils.fromDbzColumn(PostgresTypeUtils.java:66)
at org.apache.flink.cdc.connectors.postgres.source.PostgresChunkSplitter.fromDbzColumn(PostgresChunkSplitter.java:172)
at org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter.isEvenlySplitColumn(JdbcSourceChunkSplitter.java:119)
at org.apache.flink.cdc.connectors.postgres.source.PostgresChunkSplitter.splitTableIntoChunks(PostgresChunkSplitter.java:195)
at org.apache.flink.cdc.connectors.postgres.source.PostgresChunkSplitter.generateSplits(PostgresChunkSplitter.java:95)
... 15 more
能否提供提供:
- 建表的ddl,
- 该表的insert语句,有代表性的一条即可
另外,我测试是使用v9r1 版本,您是否测试过该版本?,docker启动命令如下:
docker run -tid --privileged -p 4321:54321 -v /root/kingbase/data_mysql:/home/kingbase/userdata/ -e ENABLE_CI=yes -e NEED_START=yes -e DB_USER=kingbase -e DB_PASSWORD=123456 -e DB_MODE=mysql --name kingbase_mysql kingbase_v009r001c002b0014_single_x86:v1
晚上给您提供,主要是现在这边他们主推的还是v8r6之类的版本(v9的版本甲方这边采购感觉不稳定),导致我这边测试都测8.6 建表的ddl我晚上给您导出来,其实很简单,就是主键 int4 或者 int8
晚上给您提供,主要是现在这边他们主推的还是v8r6之类的版本(v9的版本甲方这边采购感觉不稳定),导致我这边测试都测8.6 建表的ddl我晚上给您导出来,其实很简单,就是主键 int4 或者 int8
ok,等你提供了sql 脚本之后我本地搭建一个v9的环境测试一下
好的麻烦您了,据我这些天的研究,好像kingbase不同小版本都有小小差异,9版本更细化了
/*
Navicat Premium Dump SQL
Source Server : nas-kingbase
Source Server Type : PostgreSQL
Source Server Version : 120001 (120001)
Source Host : mynas.oldletter.cn:4321
Source Catalog : test_db
Source Schema : public
Target Server Type : PostgreSQL
Target Server Version : 120001 (120001)
File Encoding : 65001
Date: 15/06/2025 19:09:03
*/
-- ----------------------------
-- Table structure for test_001
-- ----------------------------
DROP TABLE IF EXISTS "public"."test_001";
CREATE TABLE "public"."test_001" (
"id" int8 NOT NULL,
"name" varchar(250) COLLATE "pg_catalog"."default",
"age" int2,
"datat" date,
"email" varchar(255) COLLATE "pg_catalog"."default"
)
;
-- ----------------------------
-- Records of test_001
-- ----------------------------
INSERT INTO "public"."test_001" VALUES (1, '戴宇宁', 224, '2012-12-23', '[email protected]');
INSERT INTO "public"."test_001" VALUES (2, '罗岚', 422, '2016-11-26', '[email protected]');
INSERT INTO "public"."test_001" VALUES (3, '蔡岚', 889, '2014-10-20', '[email protected]');
INSERT INTO "public"."test_001" VALUES (4, '崔云熙', 171, '2014-06-29', '[email protected]');
INSERT INTO "public"."test_001" VALUES (5, '胡璐', 402, '2022-02-23', '[email protected]');
INSERT INTO "public"."test_001" VALUES (6, '方岚', 805, '2009-12-27', '[email protected]');
INSERT INTO "public"."test_001" VALUES (7, '曹詩涵', 581, '2013-05-22', '[email protected]');
INSERT INTO "public"."test_001" VALUES (8, '董詩涵', 377, '2012-12-23', '[email protected]');
INSERT INTO "public"."test_001" VALUES (9, '董睿', 409, '2010-01-25', '[email protected]');
INSERT INTO "public"."test_001" VALUES (10, '董子异', 25, '2001-10-16', '[email protected]');
-- ----------------------------
-- Primary Key structure for table test_001
-- ----------------------------
ALTER TABLE "public"."test_001" ADD CONSTRAINT "test_001_pkey" PRIMARY KEY ("id");
以上提供的SQL 你确定是 MySQL 版本的?因为依据你提供的docker启动命令中的参数DB_MODE=mysql ,应该是MySQL 模式,但是你提供的DDL 和Insert 脚本应该PG模式的吧?
我只是用了 navicat链接的导出的,毕竟kingbase自己的那个链接工具太麻烦了
那你使用的数据通道是 从postgresql 同步到 模式为mysql kingbase 数据吗?你提供的 DDL 是 源库postgresql 的?
不我是同一个数据库链接,不同库
数据通道是,kingbase v8r6 mysql模式的test_db.public.test_001 -> test.public.test_001
上面的原因是因为,我最近这个电脑没装kingbase的链接工具,就用navicat的pg连进去,导出的ddl,
也可以加qq联系我,1809040656
1809040656 我没有qq ,加我微信:16660356