tis
tis copied to clipboard
hudi 增量执行出错
hudi执行增量执行错误
CREATE TABLE `stu` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '',
`name` varchar(20) NOT NULL COMMENT '',
`school` varchar(20) NOT NULL COMMENT '',
`nickname` varchar(20) NOT NULL COMMENT '',
`age` int(11) NOT NULL COMMENT '',
`class_num` int(11) NOT NULL COMMENT '班级人数',
`score` decimal(4,2) NOT NULL COMMENT '成绩',
`phone` bigint(20) NOT NULL COMMENT '电话号码',
`email` varchar(64) DEFAULT NULL COMMENT '',
`ip` varchar(32) DEFAULT NULL COMMENT '',
`address` text COMMENT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1100002 DEFAULT CHARSET=utf8
-- 一条测试数据
INSERT INTO `stu` (`id`,`name`,`school`,`nickname`,`age`,`class_num`,`score`,`phone`,`email`,`ip`,`address`)
VALUES (1100001,'doTun','beida','jasper',81,26,45.54,14597415152,'[email protected]','192.192.192.192','极乐世界f座 630103');
执行增量报错

今天试验了一下,可以重现这个错误
2022-07-15 17:40:17
java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer
at org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103)
at com.qlangtech.tis.realtime.SourceProcessFunction.processElement(SourceProcessFunction.java:47)
at com.qlangtech.tis.realtime.SourceProcessFunction.processElement(SourceProcessFunction.java:32)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.emitRecordsUnderCheckpointLock(DebeziumChangeFetcher.java:239)
at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.handleBatch(DebeziumChangeFetcher.java:225)
at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.runFetchLoop(DebeziumChangeFetcher.java:151)
at com.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:439)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
修改:
问题定位到是因为id 使用了unsigned修饰符,导致debezium在接收到数据使用long类型就行封装,而metadata解析结果却是int,导致该异常产生。解决办法是是在meta解析之后判断data_type中是否有unsigned修饰符,如果有则将数据类型改成bigint类型即可:MySQLDataSourceFactory.java#L94
complete