avro4k-kafka-serializer
avro4k-kafka-serializer copied to clipboard
use.latest.version override is not working
This library seems to be ignoring the use.latest.version`. Using this library to serialize some data, I was getting an error like the following
Error retrieving Avro schema id from schema registry for subject '<subject>' and schema: <schema>
Schema not found; error code: 40403
After investigating the original schema, I discovered that some string fields were incorrectly defined as:
"type":{
"type":"string",
"avro.java.string":"String"
},
There is an issue in avro which can malform registered String types into the above union type. I stumbled upon this issue after getting the following error trying to use this library to serialize some data to a topic using a schema which was malformed in this way. The error I got was as follows and only after investigating I discovered that my schema which got registered in my schema registry had the above problem.
I found a suggested workaround using use.latest.version
but it was not working. I then tried switching to using Avro.default.toRecord
to manually convert my data class into a GenericRecord
and then configuring my producer to use KafkaAvroSerializer
. This worked. Based on this behavior, I think that the KafkaAvro4kSerializer
is not using the latest schema version despite setting use.latest.version
.
Full traceback
java.lang.IllegalStateException: Failed to execute ApplicationRunner
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:763)
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:750)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:309)
at com.expediagroup.sd.casedomain.enricher.ApplicationKt.main(Application.kt:105)
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message with avro4k
at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerializer.serializeImpl(AbstractKafkaAvro4kSerializer.kt:50)
at com.github.thake.kafka.avro4k.serializer.KafkaAvro4kSerializer.serialize(KafkaAvro4kSerializer.kt:29)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:921)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:889)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:984)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:649)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:409)
at com.expediagroup.sd.casedomain.enricher.Application.runner$lambda-0(Application.kt:65)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:760)
... 3 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema id from schema registry for subject '<subject>' and schema: <schema> at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe.doCallToSchemaRegistry(AbstractKafkaAvro4kSerDe.kt:71)
at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe.getSchemaIdWithRetry(AbstractKafkaAvro4kSerDe.kt:82)
at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerializer.getSchemaId(AbstractKafkaAvro4kSerializer.kt:87)
at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerializer.serializeImpl(AbstractKafkaAvro4kSerializer.kt:38)
... 12 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:458)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:434)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getIdFromRegistry(CachedSchemaRegistryClient.java:316)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getId(CachedSchemaRegistryClient.java:539)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getId(CachedSchemaRegistryClient.java:519)
at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe$getSchemaIdWithRetry$2.invoke(AbstractKafkaAvro4kSerDe.kt:83)
at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe$getSchemaIdWithRetry$2.invoke(AbstractKafkaAvro4kSerDe.kt:82)
at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe$doCallToSchemaRegistry$2$1.invokeSuspend(AbstractKafkaAvro4kSerDe.kt:67)
at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe$doCallToSchemaRegistry$2$1.invoke(AbstractKafkaAvro4kSerDe.kt)
at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe$doCallToSchemaRegistry$2$1.invoke(AbstractKafkaAvro4kSerDe.kt)
at com.github.michaelbull.retry.RetryKt$retry$5.invokeSuspend(Retry.kt:52)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:234)
at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:166)
at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:431)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:420)
at kotlinx.coroutines.CancellableContinuationImpl.resumeUndispatched(CancellableContinuationImpl.kt:518)
at kotlinx.coroutines.EventLoopImplBase$DelayedResumeTask.run(EventLoop.common.kt:500)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:284)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe.doCallToSchemaRegistry(AbstractKafkaAvro4kSerDe.kt:65)
@ekerstens, thanks for raising this issue. I didn't have time to look into it yet, but I'll try to do so this week.