flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

When there are spaces in the default value of the numeric type in a new table DDL under the same Mysql Host; MySQL can be executed successfully,But Flink cdc program throws some exceptions and Flink program Failed

Open youyangkou opened this issue 2 years ago • 5 comments

Describe the bug(Please use English) When there are spaces in the default value of the numeric type in a new table DDL under the same Mysql Host; MySQL can be executed successfully,But Flink cdc program throws some exceptions and Flink program Failed。

Environment :

  • Flink version : 1.13.6
  • Flink CDC version: 2.0.2 and 2.2.0
  • Database and version: Mysql 5.7

To Reproduce Steps to reproduce the behavior:

  1. Thes test data : the new mysql table ddl:
create table rich_people_1
(
    id    int auto_increment primary key,
    name  varchar(255) null,
    age   bigint  DEFAULT ' 0 ',
    hobby varchar(255) null
);
  1. The test code : the flink cdc program code:
CREATE TEMPORARY TABLE `cdc_source` (
`id` INT COMMENT '字增主键',
`name` VARCHAR COMMENT '门店编码',
`update_time` TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) NOT ENFORCED
) with (
    'connector' = 'mysql-cdc',
    'hostname' = '',
    'port' = '3306',
    'username' = '',
    'password' = '',
    'database-name' = '',
    'table-name' = 'cdc_sync',
    'server-time-zone' = 'Asia/Shanghai',
    'scan.startup.mode' = 'latest-offset' ,
    'scan.incremental.snapshot.enabled' = 'false',
    'debezium.snapshot.locking.mode' = 'none'
);

CREATE TEMPORARY TABLE `print_table` (
  `id` INT COMMENT '字增主键',
  `name` VARCHAR COMMENT '门店编码',
  `update_time` TIMESTAMP COMMENT '更新时间'
) with (
  'connector' = 'print'
);

insert into print_table select * from cdc_source;
  1. The error :
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
	at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) ~[?:?]
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369) ~[?:?]
	at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1118) ~[?:?]
	at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:966) ~[?:?]
	at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) ~[?:?]
	at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ~[?:?]
	at java.lang.Thread.run(Thread.java:877) ~[?:1.8.0_302]
Caused by: io.debezium.DebeziumException: Error processing binlog event
	... 6 more
Caused by: java.lang.NumberFormatException: For input string: " 0 "
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[?:1.8.0_302]
	at java.lang.Long.parseLong(Long.java:578) ~[?:1.8.0_302]
	at java.lang.Long.valueOf(Long.java:803) ~[?:1.8.0_302]
	at io.debezium.jdbc.JdbcValueConverters.lambda$convertBigInt$42(JdbcValueConverters.java:919) ~[?:?]
	at io.debezium.jdbc.JdbcValueConverters.convertValue(JdbcValueConverters.java:1288) ~[?:?]
	at io.debezium.jdbc.JdbcValueConverters.convertBigInt(JdbcValueConverters.java:907) ~[?:?]
	at io.debezium.jdbc.JdbcValueConverters.lambda$converter$6(JdbcValueConverters.java:297) ~[?:?]
	at io.debezium.connector.mysql.MySqlDefaultValueConverter.setColumnDefaultValue(MySqlDefaultValueConverter.java:405) ~[?:?]
	at io.debezium.connector.mysql.antlr.listener.CreateTableParserListener.convertDefaultValueToSchemaType(CreateTableParserListener.java:147) ~[?:?]
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_302]
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_302]
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) ~[?:1.8.0_302]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_302]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_302]
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_302]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_302]
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) ~[?:1.8.0_302]
	at io.debezium.connector.mysql.antlr.listener.CreateTableParserListener.lambda$exitColumnCreateTable$1(CreateTableParserListener.java:75) ~[?:?]
	at io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser.runIfNotNull(MySqlAntlrDdlParser.java:357) ~[?:?]
	at io.debezium.connector.mysql.antlr.listener.CreateTableParserListener.exitColumnCreateTable(CreateTableParserListener.java:55) ~[?:?]
	at io.debezium.ddl.parser.mysql.generated.MySqlParser$ColumnCreateTableContext.exitRule(MySqlParser.java:4733) ~[?:?]
	at io.debezium.antlr.ProxyParseTreeListenerUtil.delegateExitRule(ProxyParseTreeListenerUtil.java:64) ~[?:?]
	at io.debezium.connector.mysql.antlr.listener.MySqlAntlrDdlParserListener.exitEveryRule(MySqlAntlrDdlParserListener.java:106) ~[?:?]
	at com.ververica.cdc.connectors.shaded.org.antlr.v4.runtime.tree.ParseTreeWalker.exitRule(ParseTreeWalker.java:48) ~[?:?]
	at com.ververica.cdc.connectors.shaded.org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:30) ~[?:?]
	at com.ververica.cdc.connectors.shaded.org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28) ~[?:?]
	at com.ververica.cdc.connectors.shaded.org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28) ~[?:?]
	at com.ververica.cdc.connectors.shaded.org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28) ~[?:?]
	at com.ververica.cdc.connectors.shaded.org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28) ~[?:?]
	at io.debezium.antlr.AntlrDdlParser.parse(AntlrDdlParser.java:87) ~[?:?]
	at io.debezium.connector.mysql.MySqlDatabaseSchema.parseDdl(MySqlDatabaseSchema.java:213) ~[?:?]
	at io.debezium.connector.mysql.MySqlDatabaseSchema.parseStreamingDdl(MySqlDatabaseSchema.java:200) ~[?:?]
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:574) ~[?:?]
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:352) ~[?:?]
	... 5 more

Additional Description image image

youyangkou avatar Jun 14 '22 07:06 youyangkou

@youyangkou May be you need to modify age bigint DEFAULT ' 0 ' to age bigint DEFAULT '0'.

gong avatar Jun 22 '22 07:06 gong

@gong I konw what you mean,but this the current situation is Mysql can execute success,but flink cdc job failed。And the table is built by back-end development not me,i cant control。

youyangkou avatar Jul 12 '22 01:07 youyangkou

This is a bug of debezimu, which is abnormal when parsing the default value of bigint type. Just fix it in the debezium project.

hezhenghongmail avatar Jul 26 '22 10:07 hezhenghongmail

image

debezium/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java

hezhenghongmail avatar Jul 26 '22 10:07 hezhenghongmail

This is a bug of debezimu, which is abnormal when parsing the default value of bigint type. Just fix it in the debezium project.

thanks,i will go to debezium code to fix it.

youyangkou avatar Jul 27 '22 02:07 youyangkou

Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink with component tag Flink CDC. Thank you!

PatrickRen avatar Feb 28 '24 15:02 PatrickRen