flink-cdc
flink-cdc copied to clipboard
When there are spaces in the default value of the numeric type in a new table DDL under the same Mysql Host; MySQL can be executed successfully,But Flink cdc program throws some exceptions and Flink program Failed
Describe the bug(Please use English) When there are spaces in the default value of the numeric type in a new table DDL under the same Mysql Host; MySQL can be executed successfully,But Flink cdc program throws some exceptions and Flink program Failed。
Environment :
- Flink version : 1.13.6
- Flink CDC version: 2.0.2 and 2.2.0
- Database and version: Mysql 5.7
To Reproduce Steps to reproduce the behavior:
- Thes test data : the new mysql table ddl:
create table rich_people_1
(
id int auto_increment primary key,
name varchar(255) null,
age bigint DEFAULT ' 0 ',
hobby varchar(255) null
);
- The test code : the flink cdc program code:
CREATE TEMPORARY TABLE `cdc_source` (
`id` INT COMMENT '字增主键',
`name` VARCHAR COMMENT '门店编码',
`update_time` TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) NOT ENFORCED
) with (
'connector' = 'mysql-cdc',
'hostname' = '',
'port' = '3306',
'username' = '',
'password' = '',
'database-name' = '',
'table-name' = 'cdc_sync',
'server-time-zone' = 'Asia/Shanghai',
'scan.startup.mode' = 'latest-offset' ,
'scan.incremental.snapshot.enabled' = 'false',
'debezium.snapshot.locking.mode' = 'none'
);
CREATE TEMPORARY TABLE `print_table` (
`id` INT COMMENT '字增主键',
`name` VARCHAR COMMENT '门店编码',
`update_time` TIMESTAMP COMMENT '更新时间'
) with (
'connector' = 'print'
);
insert into print_table select * from cdc_source;
- The error :
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) ~[?:?]
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369) ~[?:?]
at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1118) ~[?:?]
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:966) ~[?:?]
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) ~[?:?]
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ~[?:?]
at java.lang.Thread.run(Thread.java:877) ~[?:1.8.0_302]
Caused by: io.debezium.DebeziumException: Error processing binlog event
... 6 more
Caused by: java.lang.NumberFormatException: For input string: " 0 "
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[?:1.8.0_302]
at java.lang.Long.parseLong(Long.java:578) ~[?:1.8.0_302]
at java.lang.Long.valueOf(Long.java:803) ~[?:1.8.0_302]
at io.debezium.jdbc.JdbcValueConverters.lambda$convertBigInt$42(JdbcValueConverters.java:919) ~[?:?]
at io.debezium.jdbc.JdbcValueConverters.convertValue(JdbcValueConverters.java:1288) ~[?:?]
at io.debezium.jdbc.JdbcValueConverters.convertBigInt(JdbcValueConverters.java:907) ~[?:?]
at io.debezium.jdbc.JdbcValueConverters.lambda$converter$6(JdbcValueConverters.java:297) ~[?:?]
at io.debezium.connector.mysql.MySqlDefaultValueConverter.setColumnDefaultValue(MySqlDefaultValueConverter.java:405) ~[?:?]
at io.debezium.connector.mysql.antlr.listener.CreateTableParserListener.convertDefaultValueToSchemaType(CreateTableParserListener.java:147) ~[?:?]
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_302]
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_302]
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) ~[?:1.8.0_302]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_302]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_302]
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_302]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_302]
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) ~[?:1.8.0_302]
at io.debezium.connector.mysql.antlr.listener.CreateTableParserListener.lambda$exitColumnCreateTable$1(CreateTableParserListener.java:75) ~[?:?]
at io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser.runIfNotNull(MySqlAntlrDdlParser.java:357) ~[?:?]
at io.debezium.connector.mysql.antlr.listener.CreateTableParserListener.exitColumnCreateTable(CreateTableParserListener.java:55) ~[?:?]
at io.debezium.ddl.parser.mysql.generated.MySqlParser$ColumnCreateTableContext.exitRule(MySqlParser.java:4733) ~[?:?]
at io.debezium.antlr.ProxyParseTreeListenerUtil.delegateExitRule(ProxyParseTreeListenerUtil.java:64) ~[?:?]
at io.debezium.connector.mysql.antlr.listener.MySqlAntlrDdlParserListener.exitEveryRule(MySqlAntlrDdlParserListener.java:106) ~[?:?]
at com.ververica.cdc.connectors.shaded.org.antlr.v4.runtime.tree.ParseTreeWalker.exitRule(ParseTreeWalker.java:48) ~[?:?]
at com.ververica.cdc.connectors.shaded.org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:30) ~[?:?]
at com.ververica.cdc.connectors.shaded.org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28) ~[?:?]
at com.ververica.cdc.connectors.shaded.org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28) ~[?:?]
at com.ververica.cdc.connectors.shaded.org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28) ~[?:?]
at com.ververica.cdc.connectors.shaded.org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28) ~[?:?]
at io.debezium.antlr.AntlrDdlParser.parse(AntlrDdlParser.java:87) ~[?:?]
at io.debezium.connector.mysql.MySqlDatabaseSchema.parseDdl(MySqlDatabaseSchema.java:213) ~[?:?]
at io.debezium.connector.mysql.MySqlDatabaseSchema.parseStreamingDdl(MySqlDatabaseSchema.java:200) ~[?:?]
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:574) ~[?:?]
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:352) ~[?:?]
... 5 more
Additional Description
@youyangkou May be you need to modify age bigint DEFAULT ' 0 '
to age bigint DEFAULT '0'
.
@gong I konw what you mean,but this the current situation is Mysql can execute success,but flink cdc job failed。And the table is built by back-end development not me,i cant control。
This is a bug of debezimu, which is abnormal when parsing the default value of bigint type. Just fix it in the debezium project.
debezium/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java
This is a bug of debezimu, which is abnormal when parsing the default value of bigint type. Just fix it in the debezium project.
thanks,i will go to debezium code to fix it.
Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink
with component tag Flink CDC
. Thank you!