avro4k icon indicating copy to clipboard operation
avro4k copied to clipboard

Schema registry compatibility check (why not ?)

Open Chuckame opened this issue 1 year ago • 3 comments

Hey, it's me again.

When using avro with schema registry, we serialize the data by using an avro schema, what about the check compat using schema registry ?

In the official avro serializer KafkaAvroSerializer, it checks if the used schema is compatible to the remote current schema, and fails if there is a compat issue, depending on the schema's compat mode registered in the schema registry.

While in avro4k, it just serialize depending on the local schema whatever the schema registry.

To keep the same workflow, we have to first convert the data class instance to generic record using Avro.default.toRecord(kotlinSerializer, schema, value), and then use AbstractKafkaAvroSerializer#serializeImpl to convert the generic record to binary stuff.

Who I implemented it:

inline fun <reified T> KafkaAvroKotlinxSerializationSerde(isKey: Boolean, config: Map<String, *>): Serde<T> =
        Serdes.serdeFrom(KafkaAvroKotlinxSerializationSerializer(isKey, config), KafkaAvroKotlinxSerializationDeserializer(isKey, config))

inline fun <reified T> KafkaAvroKotlinxSerializationSerializer(isKey: Boolean, config: Map<String, *>): Serializer<T> =
    object : AbstractKafkaAvroSerializer(), Serializer<T> {
        private val kotlinSerializer = serializer<T>()
        private val schema = Avro.default.schema(kotlinSerializer)
        private val avroSchema = AvroSchema(schema)

        init {
            configure(KafkaAvroSerializerConfig(config))
        }

        override fun serialize(topic: String, value: T?): ByteArray? {
            if (value == null) {
                return null
            }
            val record = Avro.default.toRecord(kotlinSerializer, schema, value)
            // Then do the exact same as KafkaAvroSerializer
            return serializeImpl(
                getSubjectName(topic, isKey, value, avroSchema),
                record,
                avroSchema
            )
        }
    }

inline fun <reified T> KafkaAvroKotlinxSerializationDeserializer(isKey: Boolean, config: Map<String, *>): Deserializer<T> =
    object : AbstractKafkaAvroDeserializer(), Deserializer<T> {
        private val kotlinSerializer = serializer<T>()
        private val schema = Avro.default.schema(kotlinSerializer)

        init {
            this.isKey = isKey
            configure(KafkaAvroDeserializerConfig(config))
        }

        override fun deserialize(topic: String, record: ByteArray?): T? {
            if (record == null) {
                return null
            }
            // Do the exact same as KafkaAvroDeserializer
            val value = deserialize(topic, isKey, record, schema) as GenericRecord?
            return value?.let { Avro.default.fromRecord(kotlinSerializer, it) }
        }
    }

What we can do

Maybe this implementation is not perfect and can be done in another way, like putting a factory method inside Avro. But we will be happier if using this library removes boilerplate like this. By the way, it will not require any additional library, since it's using the org.apache.kafka.common.serialization package.

First "bad" idea

Normally, the KafkaAvroSerializer should serialize the instance as SpecificRecord since we use a specific schema. But to implement it, you have to get a field value using an index, and here is the tricky part.

I can make a PR, but I prefer to validate with you if it corresponds to the lib scope.

Chuckame avatar Aug 08 '22 12:08 Chuckame

(for the moment, it's ok, but regarding performance, it should be really better to avoid the GenericRecord step by just having the schema registry call (cons: for me, too much code to copy/paste/maintain) OR having a way to easily implement SpecificRecord (maybe have a static register with a cache)

Chuckame avatar Aug 08 '22 14:08 Chuckame

Did you have a look at https://github.com/thake/avro4k-kafka-serializer? It provides a connection between Confluent Schema-Registry and Avro4k. Have a look at the discussion in #1 for more information.

thake avatar Aug 12 '22 11:08 thake

Thanks for the answer. I'm still surprised that it's not inside this repository since it's totally coherent with this library. Because it adds more dependencies to littlest libs that have not much stars, that can afraid for long term vision.

Maybe it was declined for your scope ?

Chuckame avatar Sep 05 '22 17:09 Chuckame

not inside this repository since it's totally coherent with this library

IMO, not really. This library has no dependencies on Kafka or HTTP clients. Adding the Confluent library would add both... And exclude other implementations of the Registry API like Apicurio.

OneCricketeer avatar Apr 17 '23 23:04 OneCricketeer

@OneCricketeer thanks for you comment. I'm with you on this. I'll close the issue as not planned.

thake avatar Apr 18 '23 09:04 thake