s3-connector-for-apache-kafka icon indicating copy to clipboard operation
s3-connector-for-apache-kafka copied to clipboard

There is an issue handling avro schema with default value defined as String

Open rhatlapa opened this issue 1 year ago • 2 comments

Having an avro schema with a field defined like this:

      {
            "name": "osType",
            "type": [
                "string"
            ],
            "default": "UNKNOWN"
        },

Results in failure to process it with this kind of a stacktrace:

org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
	at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
	at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1564)
	at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1445)
	at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1323)
	at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1047)
	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:92)
	at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:540)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:496)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:496)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type STRUCT: class java.lang.String for field: "null"
	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:241)
	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
	at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
	... 23 more

The issue seems to be caused by using too old versions of org.apache.kafka:connect-api:1.1.0 & io.confluent:kafka-avro-serializer:4.1.4 => I suggest to update the dependencies to resolve the issue.

rhatlapa avatar Apr 06 '23 07:04 rhatlapa