schema-registry
schema-registry copied to clipboard
Found null value for non-optional schema when default value is set
Hi, we are facing to an issue with Kafka Connect (6.1.1) and the internal io.confluent.connect.avro.AvroData.validateSchemaValue
used by the AvroConverter
class.
We have the following avro definition, register to the Schema Registry :
"namespace": "mynamespace",
"type": "record",
"name": "MyRecord",
"version": 1,
"fields": [
{"name": "field1", "type": "string"},
{
"name": "field2",
"doc": "field with a default value",
"default": {
"Value": 0,
"Currency": "EUR"
},
"type": {
"name": "subfield2",
"type": "record",
"fields": [
{
"name": "Value",
"type": "float"
},
{
"name": "Currency",
"type": {"type": "string", "avro.java.string": "String"}
}
]
}
}
]
}
Given the following message;
{
"field1": "test"
}
- the scheme registry validators passed and the message is published to the topic
- the content of the message contains the default values (the message was produce with the conduktor tool for info and read with the same tool)
- all works fine except with kafka-connect
We are using a simple mongo-sink-connector to get messages from a topic and put the data into a mongo collection
Here is the config
{
"topics": "test-topic",
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "registry-url",
"value.converter.schemas.enable": "false",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "basicinfo-credentials",
"connection.uri": "mongodburi",
"database": "database",
"collection": "collection",
"max.num.retries": "3",
"retries.defer.timeout": "5000",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy",
"value.projection.type":"",
"value.projection.list":"",
"writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy",
"delete.on.null.values": "false",
"max.batch.size": "0",
"rate.limiting.timeout": "0",
"rate.limiting.every.n": "0",
"change.data.capture.handler": ""
}
When the sink kafka-connect received the record, here is the exception thrown :
che.kafka.clients.consumer.internals.ConsumerCoordinator:820)
[2021-05-19 08:13:20,966] ERROR WorkerSinkTask{id=sink-mongo-costing-requests-0} Error converting message value in topic 'test-topic' partition 1 at offset 130 and timestamp
1621411947679: Found null value for non-optional schema (org.apache.kafka.connect.runtime.WorkerSinkTask:547)
org.apache.kafka.connect.errors.DataException: Found null value for non-optional schema
at io.confluent.connect.avro.AvroData.validateSchemaValue(AvroData.java:1177)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1231)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1226)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1474)
at io.confluent.connect.avro.AvroData.defaultValueFromAvroWithoutLogical(AvroData.java:1902)
at io.confluent.connect.avro.AvroData.defaultValueFromAvro(AvroData.java:1885)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1818)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1562)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1687)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1538)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1221)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:115)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2021-05-19 08:13:20,969] ERROR WorkerSinkTask{id=sink-mongo-costing-requests-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runti
me.WorkerTask:191)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Found null value for non-optional schema
at io.confluent.connect.avro.AvroData.validateSchemaValue(AvroData.java:1177)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1231)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1226)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1474)
at io.confluent.connect.avro.AvroData.defaultValueFromAvroWithoutLogical(AvroData.java:1902)
at io.confluent.connect.avro.AvroData.defaultValueFromAvro(AvroData.java:1885)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1818)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1562)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1687)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1538)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1221)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:115)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
... 13 more
- this error already appears if we explicitly set the field 2 value like
{
"field1": "test",
"field2": { "Value": 0, "Currency": "EUR"}
}
To avoid this error, we need to remove the default
block on the AVRO specification which force our users to define explicitly a field2
(because the schema registry validation don't pass anymore)
It appears that setting the default value to 0.0
works as a workaround