confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Add AggregateSerializer, AggregateDeserializer

Open rkoeninger opened this issue 4 years ago • 2 comments

Description

I had to write a class like the AggregateSerializer below recently. It seems like something that would naturally be in the library given producers can produce to multiple topics and considers can subscribe to multiple topics (each with its own format).

See #838 See #1009

How to implement

In /src/confluent_kafka/serialization/__init__.py, two classes could be added:

class AggregateSerializer(Serializer):
    def __init__(self, serializers: Dict[str, Serializer]):
        self.serializers = serializers
    def __call__(self, obj, ctx: SerializationContext):
        return self.serializers[ctx.topic](obj. ctx)

# And basically the same for AggregateDeserializer

And basically the same for AggregateDeserializer, plus doc comments per other classes in module.

I can do the legwork to integrate this if it increases the chances of it getting in.

rkoeninger avatar Nov 16 '21 02:11 rkoeninger

Thanks for mention this again. I synced up with the team, I'm sorry that we don't have immediate plans to support doing so with different schemas.

jliunyu avatar Mar 23 '22 22:03 jliunyu

i'd like to see this capability in an API that leapfrogs the current one (a higher level "processor" API). this is in the visionary roadmap, but not coming on a near term horizon.

mhowlett avatar Oct 25 '22 16:10 mhowlett