flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

Invalid Java object for schema type INT64: class java.lang.String for field: "null"

Open hehetown opened this issue 2 years ago • 3 comments

Describe the bug(Please use English) A clear and concise description of what the bug is.

Environment :

  • Flink version : 1.14.4
  • Flink CDC version: 2.3
  • Database and version: oracle 19

To Reproduce Steps to reproduce the behavior:

  1. Thes test data : I'm just adding a field to the database
  2. The test code :
  3. The error : I just added a field to the database, and Flink reported the following error:

com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) at io.debezium.connector.oracle.xstream.LcrEventHandler.processLCR(LcrEventHandler.java:120) at oracle.streams.XStreamOut.XStreamOutReceiveLCRCallbackNative(Native Method) at oracle.streams.XStreamOut.receiveLCRCallback(XStreamOut.java:465) at io.debezium.connector.oracle.xstream.XstreamStreamingChangeEventSource.execute(XstreamStreamingChangeEventSource.java:105) at io.debezium.connector.oracle.xstream.XstreamStreamingChangeEventSource.execute(XstreamStreamingChangeEventSource.java:42) at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:160) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:122) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131) at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374) at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117) at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130) at io.debezium.connector.oracle.OracleDatabaseSchema.lambda$applySchemaChange$0(OracleDatabaseSchema.java:73) at java.lang.Iterable.forEach(Iterable.java:75) at io.debezium.connector.oracle.OracleDatabaseSchema.applySchemaChange(OracleDatabaseSchema.java:72) at io.debezium.pipeline.EventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcher.java:522) at io.debezium.connector.oracle.OracleSchemaChangeEventEmitter.emitSchemaChangeEvent(OracleSchemaChangeEventEmitter.java:113) at io.debezium.pipeline.EventDispatcher.dispatchSchemaChangeEvent(EventDispatcher.java:297) at io.debezium.connector.oracle.xstream.LcrEventHandler.dispatchSchemaChangeEvent(LcrEventHandler.java:186) at io.debezium.connector.oracle.xstream.LcrEventHandler.processLCR(LcrEventHandler.java:110) ... 11 more Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.lang.String for field: "null" at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)

hehetown avatar Jul 24 '22 02:07 hehetown

How do you add the field? Showing the sql you used will be helpful.

ruanhang1993 avatar Aug 18 '22 02:08 ruanhang1993

I encountered the same error,Have you found the cause?

Myles21 avatar Sep 14 '22 06:09 Myles21

I encountered the same error,Have you found the cause?

There are some problems when using add columns with default string values. @Myles21

ruanhang1993 avatar Sep 19 '22 02:09 ruanhang1993

I encountered the same error,Have you found the cause?

There are some problems when using add columns with default string values. @Myles21

I hava none default values, but encountered the same error?

radioliu92 avatar Feb 14 '23 07:02 radioliu92

相同问题 +1
image

cyrusmaster avatar May 12 '23 09:05 cyrusmaster

+1,same problem

Level1Accelerator avatar May 31 '23 09:05 Level1Accelerator

This issue is a debezium bug. Regarding the compatibility issue with the alter statement, when parsing the alter statement, if the type is given as int and the default value is 0, then 0 is recognized as the string "0". However, when converting the default value to the schema data type, that logic is not followed.

Code location:

io.debezium.connector.mysql.antlr.listener.DefaultValueParserListener#convertDefaultValue

public void convertDefaultValue(boolean skipIfUnknownOptional) {
        // For CREATE TABLE are all column default values converted only after charset is known.
        if (convertDefault) {
            if (!converted && (optionalColumn.get() != null || !skipIfUnknownOptional)) { //This place doesn't follow the logic in the if
                convertDefaultValueToSchemaType(columnEditor);
                converted = true;
            }
        }
    }

Causing a mismatch between the class of the value in validateValue and the class of the schema

org.apache.kafka.connect.data.ConnectSchema#validateValue(java.lang.String, org.apache.kafka.connect.data.Schema, java.lang.Object)

boolean foundMatch = false;
        if (expectedClasses.size() == 1) {
            foundMatch = expectedClasses.get(0).isInstance(value);
        } else {
            for (Class<?> expectedClass : expectedClasses) {
                if (expectedClass.isInstance(value)) {
                    foundMatch = true;
                    break;
                }
            }
        }

        if (!foundMatch)
            throw new DataException("Invalid Java object for schema type " + schema.type()
                    + ": " + value.getClass()
                    + " for field: \"" + name + "\"");

resolvent: Use the following syntax to alter table

ALTER TABLE cdctest.t_n add column test3 int DEFAULT 0 NULL COMMENT 'sdfds',add column test4 int DEFAULT 0 NULL COMMENT 'sdfds';

Instead of the following statement

alter table feedback  add column  ( `aa` int(11) DEFAULT 0 COMMENT 'aa')

johncai0 avatar Jun 21 '23 03:06 johncai0

Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink with component tag Flink CDC. Thank you!

PatrickRen avatar Feb 28 '24 15:02 PatrickRen