avro4k
avro4k copied to clipboard
Schema registry compatibility check (why not ?)
Hey, it's me again.
When using avro with schema registry, we serialize the data by using an avro schema, what about the check compat using schema registry ?
In the official avro serializer KafkaAvroSerializer
, it checks if the used schema is compatible to the remote current schema, and fails if there is a compat issue, depending on the schema's compat mode registered in the schema registry.
While in avro4k, it just serialize depending on the local schema whatever the schema registry.
To keep the same workflow, we have to first convert the data class instance to generic record using Avro.default.toRecord(kotlinSerializer, schema, value)
, and then use AbstractKafkaAvroSerializer#serializeImpl
to convert the generic record to binary stuff.
Who I implemented it:
inline fun <reified T> KafkaAvroKotlinxSerializationSerde(isKey: Boolean, config: Map<String, *>): Serde<T> =
Serdes.serdeFrom(KafkaAvroKotlinxSerializationSerializer(isKey, config), KafkaAvroKotlinxSerializationDeserializer(isKey, config))
inline fun <reified T> KafkaAvroKotlinxSerializationSerializer(isKey: Boolean, config: Map<String, *>): Serializer<T> =
object : AbstractKafkaAvroSerializer(), Serializer<T> {
private val kotlinSerializer = serializer<T>()
private val schema = Avro.default.schema(kotlinSerializer)
private val avroSchema = AvroSchema(schema)
init {
configure(KafkaAvroSerializerConfig(config))
}
override fun serialize(topic: String, value: T?): ByteArray? {
if (value == null) {
return null
}
val record = Avro.default.toRecord(kotlinSerializer, schema, value)
// Then do the exact same as KafkaAvroSerializer
return serializeImpl(
getSubjectName(topic, isKey, value, avroSchema),
record,
avroSchema
)
}
}
inline fun <reified T> KafkaAvroKotlinxSerializationDeserializer(isKey: Boolean, config: Map<String, *>): Deserializer<T> =
object : AbstractKafkaAvroDeserializer(), Deserializer<T> {
private val kotlinSerializer = serializer<T>()
private val schema = Avro.default.schema(kotlinSerializer)
init {
this.isKey = isKey
configure(KafkaAvroDeserializerConfig(config))
}
override fun deserialize(topic: String, record: ByteArray?): T? {
if (record == null) {
return null
}
// Do the exact same as KafkaAvroDeserializer
val value = deserialize(topic, isKey, record, schema) as GenericRecord?
return value?.let { Avro.default.fromRecord(kotlinSerializer, it) }
}
}
What we can do
Maybe this implementation is not perfect and can be done in another way, like putting a factory method inside Avro
. But we will be happier if using this library removes boilerplate like this. By the way, it will not require any additional library, since it's using the org.apache.kafka.common.serialization
package.
First "bad" idea
Normally, the KafkaAvroSerializer
should serialize the instance as SpecificRecord
since we use a specific schema. But to implement it, you have to get a field value using an index, and here is the tricky part.
I can make a PR, but I prefer to validate with you if it corresponds to the lib scope.
(for the moment, it's ok, but regarding performance, it should be really better to avoid the GenericRecord step by just having the schema registry call (cons: for me, too much code to copy/paste/maintain) OR having a way to easily implement SpecificRecord (maybe have a static register with a cache)
Did you have a look at https://github.com/thake/avro4k-kafka-serializer? It provides a connection between Confluent Schema-Registry and Avro4k. Have a look at the discussion in #1 for more information.
Thanks for the answer. I'm still surprised that it's not inside this repository since it's totally coherent with this library. Because it adds more dependencies to littlest libs that have not much stars, that can afraid for long term vision.
Maybe it was declined for your scope ?
not inside this repository since it's totally coherent with this library
IMO, not really. This library has no dependencies on Kafka or HTTP clients. Adding the Confluent library would add both... And exclude other implementations of the Registry API like Apicurio.
@OneCricketeer thanks for you comment. I'm with you on this. I'll close the issue as not planned.