schema-registry icon indicating copy to clipboard operation
schema-registry copied to clipboard

Found null value for non-optional schema when default value is set

Open M3lkior opened this issue 3 years ago • 1 comments

Hi, we are facing to an issue with Kafka Connect (6.1.1) and the internal io.confluent.connect.avro.AvroData.validateSchemaValue used by the AvroConverter class.

We have the following avro definition, register to the Schema Registry :

 "namespace": "mynamespace",
    "type": "record",
    "name": "MyRecord",
    "version": 1,
    "fields": [
{"name": "field1", "type": "string"},
        {
            "name": "field2",
            "doc": "field with a default value",
            "default": {
              "Value": 0,
              "Currency": "EUR"
            },
            "type": {
                "name": "subfield2",
                "type": "record",
                "fields": [
                    {
                        "name": "Value",
                        "type": "float"
                    },
                    {
                        "name": "Currency",
                        "type": {"type": "string", "avro.java.string": "String"}
                    }
                ]
            }
        }
    ]
}

Given the following message;

{
 "field1": "test"
}
  • the scheme registry validators passed and the message is published to the topic
  • the content of the message contains the default values (the message was produce with the conduktor tool for info and read with the same tool)
  • all works fine except with kafka-connect

We are using a simple mongo-sink-connector to get messages from a topic and put the data into a mongo collection

Here is the config

{
  
    "topics": "test-topic",
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "tasks.max": "1",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "registry-url",
    "value.converter.schemas.enable": "false",
    "value.converter.basic.auth.credentials.source": "USER_INFO",
    "value.converter.basic.auth.user.info": "basicinfo-credentials",
  

    "connection.uri": "mongodburi",
    "database": "database",
    "collection": "collection",
    "max.num.retries": "3",
    "retries.defer.timeout": "5000",

    "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy",
    "value.projection.type":"",
    "value.projection.list":"",

    "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy",


    "delete.on.null.values": "false",

    "max.batch.size": "0",
    "rate.limiting.timeout": "0",
    "rate.limiting.every.n": "0",

    "change.data.capture.handler": ""
  }

When the sink kafka-connect received the record, here is the exception thrown :

che.kafka.clients.consumer.internals.ConsumerCoordinator:820)
[2021-05-19 08:13:20,966] ERROR WorkerSinkTask{id=sink-mongo-costing-requests-0} Error converting message value in topic 'test-topic' partition 1 at offset 130 and timestamp 
1621411947679: Found null value for non-optional schema (org.apache.kafka.connect.runtime.WorkerSinkTask:547)
org.apache.kafka.connect.errors.DataException: Found null value for non-optional schema
        at io.confluent.connect.avro.AvroData.validateSchemaValue(AvroData.java:1177)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1231)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1226)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1474)
        at io.confluent.connect.avro.AvroData.defaultValueFromAvroWithoutLogical(AvroData.java:1902)
        at io.confluent.connect.avro.AvroData.defaultValueFromAvro(AvroData.java:1885)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1818)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1562)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1687)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1538)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1221)
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:115)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
[2021-05-19 08:13:20,969] ERROR WorkerSinkTask{id=sink-mongo-costing-requests-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runti
me.WorkerTask:191)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Found null value for non-optional schema
        at io.confluent.connect.avro.AvroData.validateSchemaValue(AvroData.java:1177)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1231)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1226)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1474)
        at io.confluent.connect.avro.AvroData.defaultValueFromAvroWithoutLogical(AvroData.java:1902)
        at io.confluent.connect.avro.AvroData.defaultValueFromAvro(AvroData.java:1885)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1818)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1562)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1687)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1538)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1221)
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:115)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
        ... 13 more

  • this error already appears if we explicitly set the field 2 value like
{
 "field1": "test",
"field2": { "Value": 0, "Currency": "EUR"}
}

To avoid this error, we need to remove the default block on the AVRO specification which force our users to define explicitly a field2 (because the schema registry validation don't pass anymore)

M3lkior avatar May 19 '21 08:05 M3lkior

It appears that setting the default value to 0.0 works as a workaround

M3lkior avatar May 25 '21 08:05 M3lkior