avro4k-kafka-serializer icon indicating copy to clipboard operation
avro4k-kafka-serializer copied to clipboard

use.latest.version override is not working

Open ekerstens opened this issue 2 years ago • 2 comments

This library seems to be ignoring the use.latest.version`. Using this library to serialize some data, I was getting an error like the following

Error retrieving Avro schema id from schema registry for subject '<subject>' and schema: <schema>

Schema not found; error code: 40403

After investigating the original schema, I discovered that some string fields were incorrectly defined as:

         "type":{  
            "type":"string",
            "avro.java.string":"String"
         },

There is an issue in avro which can malform registered String types into the above union type. I stumbled upon this issue after getting the following error trying to use this library to serialize some data to a topic using a schema which was malformed in this way. The error I got was as follows and only after investigating I discovered that my schema which got registered in my schema registry had the above problem.

I found a suggested workaround using use.latest.version but it was not working. I then tried switching to using Avro.default.toRecord to manually convert my data class into a GenericRecord and then configuring my producer to use KafkaAvroSerializer. This worked. Based on this behavior, I think that the KafkaAvro4kSerializer is not using the latest schema version despite setting use.latest.version.

ekerstens avatar Sep 29 '22 00:09 ekerstens

Full traceback

java.lang.IllegalStateException: Failed to execute ApplicationRunner
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:763)
	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:750)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:309)
	at com.expediagroup.sd.casedomain.enricher.ApplicationKt.main(Application.kt:105)
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message with avro4k
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerializer.serializeImpl(AbstractKafkaAvro4kSerializer.kt:50)
	at com.github.thake.kafka.avro4k.serializer.KafkaAvro4kSerializer.serialize(KafkaAvro4kSerializer.kt:29)
	at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:921)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:889)
	at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:984)
	at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:649)
	at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:409)
	at com.expediagroup.sd.casedomain.enricher.Application.runner$lambda-0(Application.kt:65)
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:760)
	... 3 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema id from schema registry for subject '<subject>' and schema: <schema>	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe.doCallToSchemaRegistry(AbstractKafkaAvro4kSerDe.kt:71)
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe.getSchemaIdWithRetry(AbstractKafkaAvro4kSerDe.kt:82)
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerializer.getSchemaId(AbstractKafkaAvro4kSerializer.kt:87)
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerializer.serializeImpl(AbstractKafkaAvro4kSerializer.kt:38)
	... 12 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:458)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:434)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getIdFromRegistry(CachedSchemaRegistryClient.java:316)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getId(CachedSchemaRegistryClient.java:539)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getId(CachedSchemaRegistryClient.java:519)
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe$getSchemaIdWithRetry$2.invoke(AbstractKafkaAvro4kSerDe.kt:83)
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe$getSchemaIdWithRetry$2.invoke(AbstractKafkaAvro4kSerDe.kt:82)
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe$doCallToSchemaRegistry$2$1.invokeSuspend(AbstractKafkaAvro4kSerDe.kt:67)
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe$doCallToSchemaRegistry$2$1.invoke(AbstractKafkaAvro4kSerDe.kt)
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe$doCallToSchemaRegistry$2$1.invoke(AbstractKafkaAvro4kSerDe.kt)
	at com.github.michaelbull.retry.RetryKt$retry$5.invokeSuspend(Retry.kt:52)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:234)
	at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:166)
	at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:431)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:420)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeUndispatched(CancellableContinuationImpl.kt:518)
	at kotlinx.coroutines.EventLoopImplBase$DelayedResumeTask.run(EventLoop.common.kt:500)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:284)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at com.github.thake.kafka.avro4k.serializer.AbstractKafkaAvro4kSerDe.doCallToSchemaRegistry(AbstractKafkaAvro4kSerDe.kt:65)

ekerstens avatar Sep 29 '22 00:09 ekerstens

@ekerstens, thanks for raising this issue. I didn't have time to look into it yet, but I'll try to do so this week.

thake avatar Oct 10 '22 09:10 thake