dinky icon indicating copy to clipboard operation
dinky copied to clipboard

[Bug] [dinky-cdc-core] The use of lambda expressions in AbstractSqlSinkBuilder causes deserialization issues in Flink

Open Selegant opened this issue 7 months ago • 3 comments

Search before asking

  • [x] I had searched in the issues and found no similar issues.

What happened

Dinky 版本 1.2.3 Flink 版本 1.20 jdk11的版本 提交方式是 standalone 执行任务 EXECUTE CDCSOURCE cdc_mysql WITH ( 'connector' = 'mysql-cdc', 'hostname' = '172.16.255.158', 'port' = '3306', 'username' = 'root', 'password' = 'root', 'checkpoint' = '3000', 'scan.startup.mode' = 'initial', 'parallelism' = '1', 'table-name' = 'nbods.ab_beddict', 'sink.connector' = 'jdbc', 'sink.url' = 'jdbc:mysql://172.16.255.158:3306/ods_copy?characterEncoding=utf-8&useSSL=false', 'sink.username' = 'root', 'sink.password' = 'root', 'sink.sink.db' = 'ods_copy', 'sink.table.prefix' = 'test_', 'sink.table.lower' = 'true', 'sink.table-name' = '#{tableName}', 'sink.driver' = 'com.mysql.jdbc.Driver', 'sink.sink.buffer-flush.interval' = '2s', 'sink.sink.buffer-flush.max-rows' = '100', 'sink.sink.max-retries' = '5', 'sink.auto.create' = 'true' ) 报错 025-06-06 01:38:56,435 WARN org.apache.flink.runtime.taskmanager.Task [] - SinkMaterializer[6] -> Sink: test_ab_beddict[6] (1/1)#1851 (1c69df105b850081f72b1bed6fbe9522_a5d048cc3217147772867b2cb3a508fa_0_1851) switched from INITIALIZING to FAILED with failure cause: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:416) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:869) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:836) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:202) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:60) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:789) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) [flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) [flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist-1.20.0.jar:1.20.0] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.io.InvalidClassException: org.apache.flink.connector.jdbc.dialect.AbstractDialect; local class incompatible: stream classdesc serialVersionUID = -5510372127161093008, local class serialVersionUID = -1079348137343042861 at java.io.ObjectStreamClass.initNonProxy(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readClassDesc(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readClassDesc(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:533) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:521) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:475) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:400) ~[flink-dist-1.20.0.jar:1.20.0] ... 12 more 2025-06-06 01:38:56,435 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for SinkMaterializer[6] -> Sink: test_ab_beddict[6] (1/1)#1851 (1c69df105b850081f72b1bed6fbe9522_a5d048cc3217147772867b2cb3a508fa_0_1851). 2025-06-06 01:38:56,435 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task SinkMaterializer[6] -> Sink: test_ab_beddict[6] (1/1)#1851 1c69df105b850081f72b1bed6fbe9522_a5d048cc3217147772867b2cb3a508fa_0_1851. 2025-06-06 01:38:56,437 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: MySQL CDC Source -> PartitionByPrimaryKey -> Shunt -> FlatMapRow -> anonymous_datastream_source$2[4] -> Calc[5] -> ConstraintEnforcer[6] (1/1)#1851 (1c69df105b850081f72b1bed6fbe9522_cbc357ccb763df2852fee8c4fc7d55f2_0_1851) switched from INITIALIZING to FAILED with failure cause: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:416) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:869) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:836) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:825) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:825) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:202) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:60) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:789) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) [flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) [flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist-1.20.0.jar:1.20.0] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.userFunction of type org.apache.flink.api.common.functions.Function in instance of org.apache.flink.streaming.api.operators.StreamFlatMap at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown Source) ~[?:?] at java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(Unknown Source) ~[?:?] at java.io.ObjectStreamClass.checkObjFieldValueTypes(Unknown Source) ~[?:?] at java.io.ObjectInputStream.defaultCheckFieldValues(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:533) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:521) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:475) ~[flink-dist-1.20.0.jar:1.20.0] at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:400) ~[flink-dist-1.20.0.jar:1.20.0] ... 16 more

What you expected to happen

我仔细看过 dinkly1.2.3 版本的代码 发现应该是AbstractSqlSinkBuilder 里面buildRow 这个方法里面 flatMap 使用了 lambda 表达式写法造成的 希望能尽快修复

How to reproduce

Dinky 版本 1.2.3 Flink 版本 1.20 jdk11的版本 提交方式是 standalone 执行任务 EXECUTE CDCSOURCE cdc_mysql WITH ( 'connector' = 'mysql-cdc', 'hostname' = '172.16.255.158', 'port' = '3306', 'username' = 'root', 'password' = 'root', 'checkpoint' = '3000', 'scan.startup.mode' = 'initial', 'parallelism' = '1', 'table-name' = 'nbods.ab_beddict', 'sink.connector' = 'jdbc', 'sink.url' = 'jdbc:mysql://172.16.255.158:3306/ods_copy?characterEncoding=utf-8&useSSL=false', 'sink.username' = 'root', 'sink.password' = 'root', 'sink.sink.db' = 'ods_copy', 'sink.table.prefix' = 'test_', 'sink.table.lower' = 'true', 'sink.table-name' = '#{tableName}', 'sink.driver' = 'com.mysql.jdbc.Driver', 'sink.sink.buffer-flush.interval' = '2s', 'sink.sink.buffer-flush.max-rows' = '100', 'sink.sink.max-retries' = '5', 'sink.auto.create' = 'true' )

Anything else

No response

Version

1.2.3

Are you willing to submit PR?

  • [x] Yes I am willing to submit a PR!

Code of Conduct

Selegant avatar Jun 11 '25 08:06 Selegant

thx. I will fix it.

aiwenmo avatar Jul 09 '25 10:07 aiwenmo

把dinky-app这个包放到flink的lib
如果是yarn就放到hdfs

javaht avatar Jul 14 '25 13:07 javaht

Hello @, this issue has not been active for more than 30 days. This issue will be closed in 7 days if there is no response. If you have any questions, you can comment and reply.

你好 @, 这个 issue 30 天内没有活跃,7 天后将关闭,如需回复,可以评论回复。

github-actions[bot] avatar Sep 01 '25 00:09 github-actions[bot]