alpakka-kafka icon indicating copy to clipboard operation
alpakka-kafka copied to clipboard

Provide a Schema Registry compatible deserialization flow

Open seglo opened this issue 5 years ago • 5 comments

Short description

When using the kafka-avro-serializer deserializers in your Kafka Consumer any SerializationException are raised internally to the client and there's no convenient way for an Alpakka Kafka user to decide on how to handle the problem.

It would be nice to have a feature like Kafka Streams which allows the user to provide a SerializationException event handler to decide if the stream should stop, or if we should skip that record and move on to the next.

Details

This problem could be solved in a few different ways, but an idiomatic Akka Streams solution would be to not use the Avro deserializer in the Kafka Consumer. Instead use a simple ByteDeserializer and then do the avro deserialization and schema registry schema version check in a flow downstream from a consumer source. Then the user can use standard Akka Streams error handling techniques to decide if they want to continue, stop, or skip offsets from being processed. This flow could be bundled with Alpakka Kafka for users to include.

Something similar could be considered for the producer use case.

seglo avatar Nov 06 '19 16:11 seglo

One solution may be to re-use the Confluent SchemaRegistryClient and KafkaAvroDeserializer. We could create a flow, i.e. akka.kafka.scaladsl.Serialization.confluentAvroDeserializer of type Flow[ConsumerRecord[K,Array[Byte]], GenericRecord, _] that will use the KafkaAvroDeserializer.deserialize method to deserialize the Array[Byte] into an Avro GenericRecord (or some other Avro type), and then use SchemaRegistryClient somehow to validate assert the schema and version.

seglo avatar Nov 07 '19 18:11 seglo

We use something similar where we have a flow Flow[ComittableMessage[Array[Byte], Array[Byte]], (ComittableMessage[Array[Byte], Array[Byte]], Try[(K, V)]), NotUsed] that takes and uses a provided org.apache.kafka.common.serialization.Deserializer[T].

Another option is instead of outputting a tuple is to output CommitableMessage[K, V], i.e. Flow[CommittableMessage[Array[Byte], Array[Byte]], Try[CommittableMessage[K, V]], NotUsed]. Let me know if you have any thoughts I'd be happy to publish a sample PR.

chipola avatar Dec 23 '19 20:12 chipola

Hi @chipola. Your second option sounds nice. One challenge would be to make this API Java-compatible too. I think instead of wrapping the exception to send downstream we could guide the user through documentation to use a recover operator to handle it.

It would be great if you could contribute a PR to prototype this feature! We can work with you to get it into Alpakka Kafka.

seglo avatar Jan 06 '20 15:01 seglo

recover will be great assuming we wrap the failure in a custom exception type which provides access to the original ConsumerMessage so users can log/deadletter/etc. as they see fit.Try has proven convenient to allow piping the results through a flexiFlow to deadletter poison messages, even though we use the Java API we weren't too bothered having Try in a few stages.

I'll give it a go.

chipola avatar Jan 07 '20 18:01 chipola

You're right, that makes sense. There are other precedents of using Try in Alpakka, so perhaps it wouldn't be too bad for Java users. I'm looking forward to seeing what you come up with.

seglo avatar Jan 08 '20 17:01 seglo