flink-cdc
flink-cdc copied to clipboard
Invalid Java object for schema type INT64: class java.lang.String for field: "null"
Describe the bug(Please use English) A clear and concise description of what the bug is.
Environment :
- Flink version : 1.14.4
- Flink CDC version: 2.3
- Database and version: oracle 19
To Reproduce Steps to reproduce the behavior:
- Thes test data : I'm just adding a field to the database
- The test code :
- The error : I just added a field to the database, and Flink reported the following error:
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) at io.debezium.connector.oracle.xstream.LcrEventHandler.processLCR(LcrEventHandler.java:120) at oracle.streams.XStreamOut.XStreamOutReceiveLCRCallbackNative(Native Method) at oracle.streams.XStreamOut.receiveLCRCallback(XStreamOut.java:465) at io.debezium.connector.oracle.xstream.XstreamStreamingChangeEventSource.execute(XstreamStreamingChangeEventSource.java:105) at io.debezium.connector.oracle.xstream.XstreamStreamingChangeEventSource.execute(XstreamStreamingChangeEventSource.java:42) at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:160) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:122) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131) at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374) at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117) at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130) at io.debezium.connector.oracle.OracleDatabaseSchema.lambda$applySchemaChange$0(OracleDatabaseSchema.java:73) at java.lang.Iterable.forEach(Iterable.java:75) at io.debezium.connector.oracle.OracleDatabaseSchema.applySchemaChange(OracleDatabaseSchema.java:72) at io.debezium.pipeline.EventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcher.java:522) at io.debezium.connector.oracle.OracleSchemaChangeEventEmitter.emitSchemaChangeEvent(OracleSchemaChangeEventEmitter.java:113) at io.debezium.pipeline.EventDispatcher.dispatchSchemaChangeEvent(EventDispatcher.java:297) at io.debezium.connector.oracle.xstream.LcrEventHandler.dispatchSchemaChangeEvent(LcrEventHandler.java:186) at io.debezium.connector.oracle.xstream.LcrEventHandler.processLCR(LcrEventHandler.java:110) ... 11 more Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.lang.String for field: "null" at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
How do you add the field? Showing the sql you used will be helpful.
I encountered the same error,Have you found the cause?
I encountered the same error,Have you found the cause?
There are some problems when using add columns with default string values. @Myles21
I encountered the same error,Have you found the cause?
There are some problems when using add columns with default string values. @Myles21
I hava none default values, but encountered the same error?
相同问题 +1
+1,same problem
This issue is a debezium bug. Regarding the compatibility issue with the alter statement, when parsing the alter statement, if the type is given as int and the default value is 0, then 0 is recognized as the string "0". However, when converting the default value to the schema data type, that logic is not followed.
Code location:
io.debezium.connector.mysql.antlr.listener.DefaultValueParserListener#convertDefaultValue
public void convertDefaultValue(boolean skipIfUnknownOptional) {
// For CREATE TABLE are all column default values converted only after charset is known.
if (convertDefault) {
if (!converted && (optionalColumn.get() != null || !skipIfUnknownOptional)) { //This place doesn't follow the logic in the if
convertDefaultValueToSchemaType(columnEditor);
converted = true;
}
}
}
Causing a mismatch between the class of the value in validateValue and the class of the schema
org.apache.kafka.connect.data.ConnectSchema#validateValue(java.lang.String, org.apache.kafka.connect.data.Schema, java.lang.Object)
boolean foundMatch = false;
if (expectedClasses.size() == 1) {
foundMatch = expectedClasses.get(0).isInstance(value);
} else {
for (Class<?> expectedClass : expectedClasses) {
if (expectedClass.isInstance(value)) {
foundMatch = true;
break;
}
}
}
if (!foundMatch)
throw new DataException("Invalid Java object for schema type " + schema.type()
+ ": " + value.getClass()
+ " for field: \"" + name + "\"");
resolvent: Use the following syntax to alter table
ALTER TABLE cdctest.t_n add column test3 int DEFAULT 0 NULL COMMENT 'sdfds',add column test4 int DEFAULT 0 NULL COMMENT 'sdfds';
Instead of the following statement
alter table feedback add column ( `aa` int(11) DEFAULT 0 COMMENT 'aa')
Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink
with component tag Flink CDC
. Thank you!