Search before asking
- [x] I had searched in the issues and found no similar issues.
What happened
Dinky 版本 1.2.3
Flink 版本 1.20 jdk11的版本
提交方式是 standalone
执行任务
EXECUTE CDCSOURCE cdc_mysql WITH (
'connector' = 'mysql-cdc',
'hostname' = '172.16.255.158',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'nbods.ab_beddict',
'sink.connector' = 'jdbc',
'sink.url' = 'jdbc:mysql://172.16.255.158:3306/ods_copy?characterEncoding=utf-8&useSSL=false',
'sink.username' = 'root',
'sink.password' = 'root',
'sink.sink.db' = 'ods_copy',
'sink.table.prefix' = 'test_',
'sink.table.lower' = 'true',
'sink.table-name' = '#{tableName}',
'sink.driver' = 'com.mysql.jdbc.Driver',
'sink.sink.buffer-flush.interval' = '2s',
'sink.sink.buffer-flush.max-rows' = '100',
'sink.sink.max-retries' = '5',
'sink.auto.create' = 'true'
)
报错
025-06-06 01:38:56,435 WARN org.apache.flink.runtime.taskmanager.Task [] - SinkMaterializer[6] -> Sink: test_ab_beddict[6] (1/1)#1851 (1c69df105b850081f72b1bed6fbe9522_a5d048cc3217147772867b2cb3a508fa_0_1851) switched from INITIALIZING to FAILED with failure cause:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:416) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:869) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:836) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:202) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:60) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:789) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) [flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) [flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist-1.20.0.jar:1.20.0]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.io.InvalidClassException: org.apache.flink.connector.jdbc.dialect.AbstractDialect; local class incompatible: stream classdesc serialVersionUID = -5510372127161093008, local class serialVersionUID = -1079348137343042861
at java.io.ObjectStreamClass.initNonProxy(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readClassDesc(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readClassDesc(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:533) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:521) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:475) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:400) ~[flink-dist-1.20.0.jar:1.20.0]
... 12 more
2025-06-06 01:38:56,435 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for SinkMaterializer[6] -> Sink: test_ab_beddict[6] (1/1)#1851 (1c69df105b850081f72b1bed6fbe9522_a5d048cc3217147772867b2cb3a508fa_0_1851).
2025-06-06 01:38:56,435 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task SinkMaterializer[6] -> Sink: test_ab_beddict[6] (1/1)#1851 1c69df105b850081f72b1bed6fbe9522_a5d048cc3217147772867b2cb3a508fa_0_1851.
2025-06-06 01:38:56,437 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: MySQL CDC Source -> PartitionByPrimaryKey -> Shunt -> FlatMapRow -> anonymous_datastream_source$2[4] -> Calc[5] -> ConstraintEnforcer[6] (1/1)#1851 (1c69df105b850081f72b1bed6fbe9522_cbc357ccb763df2852fee8c4fc7d55f2_0_1851) switched from INITIALIZING to FAILED with failure cause:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:416) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:869) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:836) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:825) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:825) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:202) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:60) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:789) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) [flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) [flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist-1.20.0.jar:1.20.0]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.userFunction of type org.apache.flink.api.common.functions.Function in instance of org.apache.flink.streaming.api.operators.StreamFlatMap
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown Source) ~[?:?]
at java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(Unknown Source) ~[?:?]
at java.io.ObjectStreamClass.checkObjFieldValueTypes(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultCheckFieldValues(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:533) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:521) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:475) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:400) ~[flink-dist-1.20.0.jar:1.20.0]
... 16 more
What you expected to happen
我仔细看过 dinkly1.2.3 版本的代码 发现应该是AbstractSqlSinkBuilder 里面buildRow 这个方法里面 flatMap 使用了 lambda 表达式写法造成的 希望能尽快修复
How to reproduce
Dinky 版本 1.2.3
Flink 版本 1.20 jdk11的版本
提交方式是 standalone
执行任务
EXECUTE CDCSOURCE cdc_mysql WITH (
'connector' = 'mysql-cdc',
'hostname' = '172.16.255.158',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'nbods.ab_beddict',
'sink.connector' = 'jdbc',
'sink.url' = 'jdbc:mysql://172.16.255.158:3306/ods_copy?characterEncoding=utf-8&useSSL=false',
'sink.username' = 'root',
'sink.password' = 'root',
'sink.sink.db' = 'ods_copy',
'sink.table.prefix' = 'test_',
'sink.table.lower' = 'true',
'sink.table-name' = '#{tableName}',
'sink.driver' = 'com.mysql.jdbc.Driver',
'sink.sink.buffer-flush.interval' = '2s',
'sink.sink.buffer-flush.max-rows' = '100',
'sink.sink.max-retries' = '5',
'sink.auto.create' = 'true'
)
Anything else
No response
Version
1.2.3
Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
Code of Conduct
把dinky-app这个包放到flink的lib
如果是yarn就放到hdfs
Hello @, this issue has not been active for more than 30 days. This issue will be closed in 7 days if there is no response. If you have any questions, you can comment and reply.
你好 @, 这个 issue 30 天内没有活跃,7 天后将关闭,如需回复,可以评论回复。