[Bug] [CDCSOURCE] MysqlDebeziumConverter not worked properly
Search before asking
- [x] I had searched in the issues and found no similar issues.
What happened
When I use CDCSOURCE and mysql-cdc to get data that is datetime type with no debezium converter from mysql which server-time-zone is 'Asia/Shanghai', the data has 8 hour shift from the original data in mysql. So I checked issues and found pr #3151 added MysqlDebeziumConverter to convert datetime data to right time zone. But when I used it in version 1.2.1, it throwed an DateTimeParseException:
[dinky] 2025-03-06 14:44:04.446 ERROR 104 --- [rcer[3] (1/1)#0] org.dinky.cdc.AbstractSinkBuilder: SchemaTable: db_cdc_test.t_datetime_test - Row: {"after":{"id":1,"datetime_value":"2025-03-06 14:44:01"},"source":{"version":"1.9.8.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1741243444000,"snapshot":"false","db":"db_cdc_test","table":"t_datetime_test","server_id":1,"file":"mysql-bin.000051","pos":505302235,"row":0,"thread":12094833},"op":"c","ts_ms":1741243444388} - Exception java.time.format.DateTimeParseException: Text '2025-03-06 14:44:01' could not be parsed at index 10
[dinky] 2025-03-06 14:44:04.446 ERROR 104 --- [rcer[3] (1/1)#0] org.dinky.cdc.AbstractSinkBuilder: Could not forward element to next operator
2025-03-06T06:44:04.447760041Z org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
2025-03-06T06:44:04.447765901Z at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447769192Z at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:61) ~[flink-dist-1.19.1.jar:1.19.1]
at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447776172Z at org.dinky.cdc.sql.AbstractSqlSinkBuilder$1.processElement(AbstractSqlSinkBuilder.java:173) ~[dinky-cdc-core-1.2.1.jar:?]
2025-03-06T06:44:04.447779760Z at org.dinky.cdc.sql.AbstractSqlSinkBuilder$1.processElement(AbstractSqlSinkBuilder.java:166) ~[dinky-cdc-core-1.2.1.jar:?]
2025-03-06T06:44:04.447782839Z at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447786085Z at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447788879Z at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.19.1.jar:1.19.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.19.1.jar:1.19.1]
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447810319Z at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447813281Z at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.19.1.jar:1.19.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447819459Z at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310) ~[flink-dist-1.19.1.jar:1.19.1]
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist-1.19.1.jar:1.19.1]
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:147) ~[flink-cdc-pipeline-connector-mysql-3.2.1.jar:3.2.1]
at org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema.deserialize(JsonDebeziumDeserializationSchema.java:73) ~[flink-cdc-pipeline-connector-mysql-3.2.1.jar:3.2.1]
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:120) ~[flink-cdc-pipeline-connector-mysql-3.2.1.jar:3.2.1]
2025-03-06T06:44:04.447858182Z at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:101) ~[flink-cdc-pipeline-connector-mysql-3.2.1.jar:3.2.1]
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:73) ~[flink-cdc-pipeline-connector-mysql-3.2.1.jar:3.2.1]
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:46) ~[flink-cdc-pipeline-connector-mysql-3.2.1.jar:3.2.1]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:160) ~[flink-cdc-pipeline-connector-elasticsearch-3.2.1.jar:3.2.1]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447876250Z at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.19.1.jar:1.19.1]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447894248Z at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447911834Z at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.19.1.jar:1.19.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909) ~[flink-dist-1.19.1.jar:1.19.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) ~[flink-dist-1.19.1.jar:1.19.1]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist-1.19.1.jar:1.19.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447936123Z at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447939333Z at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447943694Z at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_432]
Caused by: java.time.format.DateTimeParseException: Text '2025-03-06 14:44:01' could not be parsed at index 10
2025-03-06T06:44:04.447957466Z at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) ~[?:1.8.0_432]
2025-03-06T06:44:04.447962718Z at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1851) ~[?:1.8.0_432]
2025-03-06T06:44:04.447968939Z at java.time.Instant.parse(Instant.java:395) ~[?:1.8.0_432]
at org.dinky.cdc.convert.DataTypeConverter.convertToTimestamp(DataTypeConverter.java:373) ~[dinky-cdc-core-1.2.1.jar:?]
2025-03-06T06:44:04.447976958Z at org.dinky.cdc.convert.DataTypeConverter.convertToRow(DataTypeConverter.java:137) ~[dinky-cdc-core-1.2.1.jar:?]
at org.dinky.cdc.sql.AbstractSqlSinkBuilder.rowCollect(AbstractSqlSinkBuilder.java:110) ~[dinky-cdc-core-1.2.1.jar:?]
2025-03-06T06:44:04.447986914Z at org.dinky.cdc.sql.AbstractSqlSinkBuilder.lambda$sinkRowFunction$2af4781a$1(AbstractSqlSinkBuilder.java:73) ~[dinky-cdc-core-1.2.1.jar:?]
2025-03-06T06:44:04.447991925Z at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) ~[flink-dist-1.19.1.jar:1.19.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.19.1.jar:1.19.1]
I read the code, and found that it maybe the org.dinky.cdc.debezium.converter.MysqlDebeziumConverter is not compatible with the org.dinky.cdc.convert.DataTypeConverter.
What you expected to happen
MysqlDebeziumConverter work properly with no error, and datetime type has no time shift.
How to reproduce
Create test table on mysql:
CREATE TABLE `t_datetime_test` (
`id` INT UNSIGNED NOT NULL AUTO_INCREMENT,
`datetime_value` DATETIME DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=INNODB
Run cdc job on dinky:
EXECUTE CDCSOURCE mysql-cdc-test WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql',
'port' = '3306',
'username' = 'xxx',
'password' = 'xxx',
'database-name' = 'db_cdc_test',
'checkpoint' = '60000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'debezium.skipped.operations'='d',
'table-name' = 'db_cdc_test\.t_datetime_test',
'sink.connector' = 'print',
'source.server-time-zone' = 'Asia/Shanghai',
'sink.timezone' = 'Asia/Shanghai',
'debezium.converters' = 'datetime',
'debezium.datetime.type' = 'org.dinky.cdc.debezium.converter.MysqlDebeziumConverter',
'debezium.datetime.database.type' = 'mysql',
'debezium.datetime.format.timestamp.zone' = 'Asia/Shanghai'
);
Anything else
No response
Version
1.2.1
Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
Code of Conduct
- [x] I agree to follow this project's Code of Conduct
Hello @9AutumnRain, this issue is about CDC/CDCSOURCE, so I assign it to @aiwenmo. If you have any questions, you can comment and reply.
你好 @9AutumnRain, 这个 issue 是关于 CDC/CDCSOURCE 的,所以我把它分配给了 @aiwenmo。如有任何问题,可以评论回复。
EXECUTE CDCSOURCE dinky_paimon_auto_create_table WITH ( 'connector' = 'mysql-cdc', 'hostname' = '${host}', 'port' = '${port}', 'username' = '${username}', 'password'='******', 'checkpoint' = '10000', 'parallelism' = '1', 'scan.startup.mode' = 'initial', -- 'database-name' = 'hfins', 'table-name' = 'm.log', 'sink.connector' = 'paimon', 'sink.path' = 's3://test/paimon/mysql.db/#{tableName}', 'sink.table.prefix' = 'ods' ); I'm having the same issue, but in version 1.1.0 the same configuration time field values are normal
https://github.com/DataLinkDC/dinky/pull/3151 . It is not included in Dinky 1.2.
Have a try in using sink.timezone.
Have a try in using
sink.timezone.
I have already tried using sink.timezone:
EXECUTE CDCSOURCE mysql-cdc-test WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql-rds-test.database.syy.com',
'port' = '3306',
'username' = 'cdc_user',
'password' = 'P@ss1234',
'database-name' = 'db_cdc_test',
'checkpoint' = '60000',
'scan.startup.mode' = 'latest-offset',
'parallelism' = '1',
'debezium.skipped.operations'='d',
'table-name' = 'db_cdc_test\.t_datetime_test',
'sink.connector' = 'print',
'source.server-time-zone' = 'Asia/Shanghai',
'sink.timezone' = 'Asia/Shanghai'
);
with table:
CREATE TABLE `t_datetime_test` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`datetime_value` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB
;
After I executed sql:
UPDATE db_cdc_test.`t_datetime_test`
SET datetime_value = '2025-3-10 09:27:00'
WHERE id = 1
the result print in stdout is :
-D[1, 2025-03-10T17:26]
+I[1, 2025-03-10T17:27]
Hello @, this issue has not been active for more than 30 days. This issue will be closed in 7 days if there is no response. If you have any questions, you can comment and reply.
你好 @, 这个 issue 30 天内没有活跃,7 天后将关闭,如需回复,可以评论回复。