dinky icon indicating copy to clipboard operation
dinky copied to clipboard

[Bug] [CDCSOURCE] MysqlDebeziumConverter not worked properly

Open 9AutumnRain opened this issue 10 months ago • 6 comments

Search before asking

  • [x] I had searched in the issues and found no similar issues.

What happened

When I use CDCSOURCE and mysql-cdc to get data that is datetime type with no debezium converter from mysql which server-time-zone is 'Asia/Shanghai', the data has 8 hour shift from the original data in mysql. So I checked issues and found pr #3151 added MysqlDebeziumConverter to convert datetime data to right time zone. But when I used it in version 1.2.1, it throwed an DateTimeParseException:

[dinky] 2025-03-06 14:44:04.446  ERROR   104 --- [rcer[3] (1/1)#0] org.dinky.cdc.AbstractSinkBuilder: SchemaTable: db_cdc_test.t_datetime_test - Row: {"after":{"id":1,"datetime_value":"2025-03-06 14:44:01"},"source":{"version":"1.9.8.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1741243444000,"snapshot":"false","db":"db_cdc_test","table":"t_datetime_test","server_id":1,"file":"mysql-bin.000051","pos":505302235,"row":0,"thread":12094833},"op":"c","ts_ms":1741243444388} - Exception java.time.format.DateTimeParseException: Text '2025-03-06 14:44:01' could not be parsed at index 10
[dinky] 2025-03-06 14:44:04.446  ERROR   104 --- [rcer[3] (1/1)#0] org.dinky.cdc.AbstractSinkBuilder: Could not forward element to next operator
2025-03-06T06:44:04.447760041Z org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
2025-03-06T06:44:04.447765901Z 	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447769192Z 	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:61) ~[flink-dist-1.19.1.jar:1.19.1]
	at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447776172Z 	at org.dinky.cdc.sql.AbstractSqlSinkBuilder$1.processElement(AbstractSqlSinkBuilder.java:173) ~[dinky-cdc-core-1.2.1.jar:?]
2025-03-06T06:44:04.447779760Z 	at org.dinky.cdc.sql.AbstractSqlSinkBuilder$1.processElement(AbstractSqlSinkBuilder.java:166) ~[dinky-cdc-core-1.2.1.jar:?]
2025-03-06T06:44:04.447782839Z 	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447786085Z 	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447788879Z 	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.19.1.jar:1.19.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.19.1.jar:1.19.1]
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447810319Z 	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447813281Z 	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.19.1.jar:1.19.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447819459Z 	at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310) ~[flink-dist-1.19.1.jar:1.19.1]
	at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist-1.19.1.jar:1.19.1]
	at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:147) ~[flink-cdc-pipeline-connector-mysql-3.2.1.jar:3.2.1]
	at org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema.deserialize(JsonDebeziumDeserializationSchema.java:73) ~[flink-cdc-pipeline-connector-mysql-3.2.1.jar:3.2.1]
	at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:120) ~[flink-cdc-pipeline-connector-mysql-3.2.1.jar:3.2.1]
2025-03-06T06:44:04.447858182Z 	at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:101) ~[flink-cdc-pipeline-connector-mysql-3.2.1.jar:3.2.1]
	at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:73) ~[flink-cdc-pipeline-connector-mysql-3.2.1.jar:3.2.1]
	at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:46) ~[flink-cdc-pipeline-connector-mysql-3.2.1.jar:3.2.1]
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:160) ~[flink-cdc-pipeline-connector-elasticsearch-3.2.1.jar:3.2.1]
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447876250Z 	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.19.1.jar:1.19.1]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447894248Z 	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447911834Z 	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.19.1.jar:1.19.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909) ~[flink-dist-1.19.1.jar:1.19.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) ~[flink-dist-1.19.1.jar:1.19.1]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist-1.19.1.jar:1.19.1]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447936123Z 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447939333Z 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist-1.19.1.jar:1.19.1]
2025-03-06T06:44:04.447943694Z 	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_432]
Caused by: java.time.format.DateTimeParseException: Text '2025-03-06 14:44:01' could not be parsed at index 10
2025-03-06T06:44:04.447957466Z 	at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) ~[?:1.8.0_432]
2025-03-06T06:44:04.447962718Z 	at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1851) ~[?:1.8.0_432]
2025-03-06T06:44:04.447968939Z 	at java.time.Instant.parse(Instant.java:395) ~[?:1.8.0_432]
	at org.dinky.cdc.convert.DataTypeConverter.convertToTimestamp(DataTypeConverter.java:373) ~[dinky-cdc-core-1.2.1.jar:?]
2025-03-06T06:44:04.447976958Z 	at org.dinky.cdc.convert.DataTypeConverter.convertToRow(DataTypeConverter.java:137) ~[dinky-cdc-core-1.2.1.jar:?]
	at org.dinky.cdc.sql.AbstractSqlSinkBuilder.rowCollect(AbstractSqlSinkBuilder.java:110) ~[dinky-cdc-core-1.2.1.jar:?]
2025-03-06T06:44:04.447986914Z 	at org.dinky.cdc.sql.AbstractSqlSinkBuilder.lambda$sinkRowFunction$2af4781a$1(AbstractSqlSinkBuilder.java:73) ~[dinky-cdc-core-1.2.1.jar:?]
2025-03-06T06:44:04.447991925Z 	at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) ~[flink-dist-1.19.1.jar:1.19.1]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.19.1.jar:1.19.1]

I read the code, and found that it maybe the org.dinky.cdc.debezium.converter.MysqlDebeziumConverter is not compatible with the org.dinky.cdc.convert.DataTypeConverter.

What you expected to happen

MysqlDebeziumConverter work properly with no error, and datetime type has no time shift.

How to reproduce

Create test table on mysql:

CREATE TABLE `t_datetime_test` (
  `id` INT UNSIGNED NOT NULL AUTO_INCREMENT,
  `datetime_value` DATETIME DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=INNODB 

Run cdc job on dinky:


EXECUTE CDCSOURCE mysql-cdc-test WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'mysql',
 'port' = '3306',
 'username' = 'xxx',
 'password' = 'xxx',
 'database-name' = 'db_cdc_test',
 'checkpoint' = '60000',
 'scan.startup.mode' = 'initial',
 'parallelism' = '1',
 'debezium.skipped.operations'='d',
 'table-name' = 'db_cdc_test\.t_datetime_test',
 'sink.connector' = 'print',
 'source.server-time-zone' = 'Asia/Shanghai',
 'sink.timezone' = 'Asia/Shanghai',
 'debezium.converters' = 'datetime',
 'debezium.datetime.type' = 'org.dinky.cdc.debezium.converter.MysqlDebeziumConverter',
 'debezium.datetime.database.type' = 'mysql',
 'debezium.datetime.format.timestamp.zone' = 'Asia/Shanghai'
);


Anything else

No response

Version

1.2.1

Are you willing to submit PR?

  • [x] Yes I am willing to submit a PR!

Code of Conduct

9AutumnRain avatar Mar 06 '25 06:03 9AutumnRain

Hello @9AutumnRain, this issue is about CDC/CDCSOURCE, so I assign it to @aiwenmo. If you have any questions, you can comment and reply.

你好 @9AutumnRain, 这个 issue 是关于 CDC/CDCSOURCE 的,所以我把它分配给了 @aiwenmo。如有任何问题,可以评论回复。

github-actions[bot] avatar Mar 06 '25 06:03 github-actions[bot]

EXECUTE CDCSOURCE dinky_paimon_auto_create_table WITH ( 'connector' = 'mysql-cdc', 'hostname' = '${host}', 'port' = '${port}', 'username' = '${username}', 'password'='******', 'checkpoint' = '10000', 'parallelism' = '1', 'scan.startup.mode' = 'initial', -- 'database-name' = 'hfins', 'table-name' = 'm.log', 'sink.connector' = 'paimon', 'sink.path' = 's3://test/paimon/mysql.db/#{tableName}', 'sink.table.prefix' = 'ods' ); I'm having the same issue, but in version 1.1.0 the same configuration time field values are normal

kevinlin299 avatar Mar 06 '25 08:03 kevinlin299

https://github.com/DataLinkDC/dinky/pull/3151 . It is not included in Dinky 1.2.

aiwenmo avatar Mar 08 '25 08:03 aiwenmo

Have a try in using sink.timezone.

aiwenmo avatar Mar 08 '25 09:03 aiwenmo

Have a try in using sink.timezone.

I have already tried using sink.timezone:


EXECUTE CDCSOURCE mysql-cdc-test WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'mysql-rds-test.database.syy.com',
 'port' = '3306',
 'username' = 'cdc_user',
 'password' = 'P@ss1234',
 'database-name' = 'db_cdc_test',
 'checkpoint' = '60000',
 'scan.startup.mode' = 'latest-offset',
 'parallelism' = '1',
 'debezium.skipped.operations'='d',
 'table-name' = 'db_cdc_test\.t_datetime_test',
 'sink.connector' = 'print',
 'source.server-time-zone' = 'Asia/Shanghai',
 'sink.timezone' = 'Asia/Shanghai'
);

with table:

CREATE TABLE `t_datetime_test` (
  `id` int unsigned NOT NULL AUTO_INCREMENT,
  `datetime_value` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB
;

After I executed sql:

UPDATE db_cdc_test.`t_datetime_test`
SET datetime_value = '2025-3-10 09:27:00'
WHERE id = 1

the result print in stdout is :

-D[1, 2025-03-10T17:26]
+I[1, 2025-03-10T17:27]

9AutumnRain avatar Mar 10 '25 01:03 9AutumnRain

Hello @, this issue has not been active for more than 30 days. This issue will be closed in 7 days if there is no response. If you have any questions, you can comment and reply.

你好 @, 这个 issue 30 天内没有活跃,7 天后将关闭,如需回复,可以评论回复。

github-actions[bot] avatar May 01 '25 00:05 github-actions[bot]