flink-cdc
flink-cdc copied to clipboard
flink-cdc 同步 mysql 表,当字段为 enum 枚举类型时: Invalid value: null used for required field: "STATUS", schema type: STRING
我使用 flink-cdc 同步 mysql 表到 tidb . 两个数据表的结构完全一致.
其中 mysql 表中有一个字段类型为:
STATUS
enum('1','2','3') NOT NULL DEFAULT '1' COMMENT '状态(1: 正常 2:注销 3:异常)'
我使用 flink sql 映射的类型为 STATUS
STRING .
出现了异常:
org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "STATUS", schema type: STRING
谢谢!
English: I use flick CDC to synchronize MySQL tables to tidb. The structures of the two data tables are exactly the same. There is a field type in the MySQL table:
`STATUS` enum('1','2','3') NOT NULL DEFAULT '1' COMMENT '...'
I use Flink SQL to map
`STATUS` STRING
An exception occurred:
org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "STATUS", schema type: STRING
thanks!
同样问题 求解
It seems database schema parsed by MySqlParallelSource missing ENUM filed's defaultValue. I'm trying to fix this problem.
@baobeidaodao It seems that the field "STATUS" for some records is null and then passed to debezium to do conversion. But the column is 'NOT NULL', so it will fail to pass validation and then throw such exception in debezium.
But I can't figure out why the field "STATUS" can be null as it has declared as NOT NULL in database. Maybe it is related to the sequence of statements (DDL, DML).
And also I can't reproduce the issue in my local enviroment.
@baobeidaodao It seems that the field "STATUS" for some records is null and then passed to debezium to do conversion. But the column is 'NOT NULL', so it will fail to pass validation and then throw such exception in debezium.
But I can't figure out why the field "STATUS" can be null as it has declared as NOT NULL in database. Maybe it is related to the sequence of statements (DDL, DML).
And also I can't reproduce the issue in my local enviroment.
@luoyuxia I face the same problem. In my case, this error occurs when the insert is executed after the job is restarted.
- start cdc job - succ
- insert data - succ
- kill job
- waiting restart job
- insert data - error
java.lang.IllegalArgumentException: Unexpected value for JDBC type 1 and column side ENUM(1) CHARSET utf8 NOT NULL: class=class java.lang.Integer
Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "side", schema type: STRING
我也遇到了这个问题,使用的是flinc cdc 2.2版本,采集mysql全库从savepoint启动后,就报这个错,然后我用单表测试就不会出现这个问题。
It seems that even though the commit is marked as merged into master, it doesn't show up in any of the public branches - the solution doesn't seem to have been submitted correctly.
git branch --contains 8e42fff5dddd32ac1d708106b9bcc59ae5bb5b5f error: no such commit 8e42fff5dddd32ac1d708106b9bcc59ae5bb5b5f
P.S.: We adopted the least version of flink-cdc, but still having problem handling enum type columns.
我找到了为什么报错: TableSchemaBuilder 在做转换的时候,转换异常,导致 result 不会 put 这个字段,在debezium解析的时候就异常了,这就是为什么数据库明明为 not null 却还报空
if (converter != null) {
try {
value = converter.convert(value);
result.put(fields[i], value);
}
catch (DataException | IllegalArgumentException e) {
Column col = columns.get(i);
LOGGER.error("Failed to properly convert data value for '{}.{}' of type {} for row {}:",
tableId, col.name(), col.typeName(), row, e);
}
catch (final Exception e) {
Column col = columns.get(i);
LOGGER.error("Failed to properly convert data value for '{}.{}' of type {} for row {}:",
tableId, col.name(), col.typeName(), row, e);
}
}
我发生的时机是在: 比如数据库有 table a(id int,businessType enum('A','B') not null),有一条数据就好
- 全量拉取 a数据
- ddl 修改 businessType 字段,enum 多加一个枚举,或者少一个,
- 做savepoint,或者等待一个checkpoint
- 停止任务
- 带 sp或者最后一个cp启动
- 修改、新增a表中任何一条数据
这个时候就会报错了,因为此时状态中的name字段 枚举 值是空的。 我再cat 状态数据的时候,可以看到,enum数据的确是空的
自问自答🙈,暂时通过侧面非官方的方式解决了这个BUG。 运行过程中 enum是不会空的,但是做了ck之后 貌似就空了,我不太会找到底哪里丢失的,所以,我从恢复的时候入手去补全了enum。
修改了代码 com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer. 也就是快照的序列化器
readTableSchemas 方法,给他补全。当然这里判断了一下是不是 binlog的split恢复,因为这个bug大概率会出现在binlog阶段,减少资源消耗
We have the same problem here. If the enum field is added to a table during the cdc task execution and the task is recovered from checkpoint, an error occurs when the enum field is displayed in the binlog information of the table. So the binlog of this table is lost.
Reason is debezium in dealing with the alter table XXXX add column, through JsonTableChangeSerializer require serialization tablechange information, including the enum enumValues without processing. Reference flink FlinkJsonTableChangeSerializer in CDC, the increase in toDocument and fromDocument enumVales treatment, can solve.
fix in debuzium:
非常感谢,希望flink cdc新的版本可以处理掉这个问题
------------------ 原始邮件 ------------------ 发件人: "ververica/flink-cdc-connectors" @.>; 发送时间: 2023年5月23日(星期二) 中午11:30 @.>; @.@.>; 主题: Re: [ververica/flink-cdc-connectors] flink-cdc 同步 mysql 表,当字段为 enum 枚举类型时: Invalid value: null used for required field: "STATUS", schema type: STRING (#371)
We have the same problem here. If the enum field is added to a table during the cdc task execution and the task is recovered from checkpoint, an error occurs when the enum field is displayed in the binlog information of the table. So the binlog of this table is lost.
Reason is debezium in dealing with the alter table XXXX add column, through JsonTableChangeSerializer require serialization tablechange information, including the enum enumValues without processing. Reference flink FlinkJsonTableChangeSerializer in CDC, the increase in toDocument and fromDocument enumVales treatment, can solve.
— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you were mentioned.Message ID: @.***>
cc:@ruanhang1993 @luoyuxia
debezium starting with 1.8.0final, fix this issue.
You can upgrade debezium or fix the bug yourself.
Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink
with component tag Flink CDC
. Thank you!