tis icon indicating copy to clipboard operation
tis copied to clipboard

hudi 增量执行出错

Open baisui1981 opened this issue 3 years ago • 2 comments

hudi执行增量执行错误

CREATE TABLE `stu` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '',
  `name` varchar(20) NOT NULL COMMENT '',
  `school` varchar(20) NOT NULL COMMENT '',
  `nickname` varchar(20) NOT NULL COMMENT '',
  `age` int(11) NOT NULL COMMENT '',
  `class_num` int(11) NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` bigint(20) NOT NULL COMMENT '电话号码',
  `email` varchar(64) DEFAULT NULL COMMENT '',
  `ip` varchar(32) DEFAULT NULL COMMENT '',
  `address` text COMMENT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1100002 DEFAULT CHARSET=utf8

-- 一条测试数据
INSERT INTO `stu` (`id`,`name`,`school`,`nickname`,`age`,`class_num`,`score`,`phone`,`email`,`ip`,`address`)
VALUES (1100001,'doTun','beida','jasper',81,26,45.54,14597415152,'[email protected]','192.192.192.192','极乐世界f座 630103');

执行增量报错 屏幕快照 2022-06-06 下午1 29 03

baisui1981 avatar Jun 06 '22 05:06 baisui1981

今天试验了一下,可以重现这个错误

2022-07-15 17:40:17
java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer
	at org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
	at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
	at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103)
	at com.qlangtech.tis.realtime.SourceProcessFunction.processElement(SourceProcessFunction.java:47)
	at com.qlangtech.tis.realtime.SourceProcessFunction.processElement(SourceProcessFunction.java:32)
	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
	at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.emitRecordsUnderCheckpointLock(DebeziumChangeFetcher.java:239)
	at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.handleBatch(DebeziumChangeFetcher.java:225)
	at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.runFetchLoop(DebeziumChangeFetcher.java:151)
	at com.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:439)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

baisui1981 avatar Jul 15 '22 09:07 baisui1981

修改: 问题定位到是因为id 使用了unsigned修饰符,导致debezium在接收到数据使用long类型就行封装,而metadata解析结果却是int,导致该异常产生。解决办法是是在meta解析之后判断data_type中是否有unsigned修饰符,如果有则将数据类型改成bigint类型即可:MySQLDataSourceFactory.java#L94

baisui1981 avatar Jul 16 '22 09:07 baisui1981

complete

baisui1981 avatar Nov 01 '22 08:11 baisui1981