python-schema-registry-client
python-schema-registry-client copied to clipboard
Faust Generic Serializer
Feature Requestβ¨
While I may be treating the package incorrectly, it appears that, by intention, there exists one serializer per schema. I was interested in a generic serializer that took any registered Record
/AvroModel
subclass and attempted to serialize/deserialize it.
Additionally the ability to register a specific schema to a confluent kafka topic value/key was desirable.
I quickly whipped up a example:
- It operates by identifying the registered model using the
Record
namespace. - Models must be registered before serialization is attempted. This will register the schema to the confluent registry.
- Will attempt to deserialize any avro message that contains a confluent schema id.
-
register_to_topic
will attempt to register the schema as a confluent topic value or key schema.
Considerations ποΈ:
- Does not inherit from Serializer or MessageSerializer (Due to perceived one schema constraint)
- Not sure how reliable using
Record
namespace is for model identification - Potential to pre-register all of these model classes using some common baseclass? (Would also allow all subclasses to be pre-set to use this codec).
Questions π€:
- It appears
_loads
and_dumps
must be synchronous due to Faust compatibility, would there be anyway to use anAsyncSchemaRegistryClient
and support async avro writers within these functions?
Notes π:
- First ever public github feature request!! π
- Would love feedback and suggestions as I'm already using this myself
class AvroModelCodec(Codec):
schemaregistry_client: SchemaRegistryClient
ns_to_id: Dict[str, int] = dict()
id_to_schema: Dict[int, Any] = dict()
def __init__(self, client: SchemaRegistryClient) -> None:
super().__init__()
self.schemaregistry_client = client
def register_to_topic(
self,
topic: str,
*,
key_model: Optional[AvroModel] = None,
value_model: Optional[AvroModel] = None
) -> int:
""" Register AvroModel to Codec, Schema Registry, and Confuent Kafka topic """
if key_model and issubclass(key_model, AvroModel):
self.schemaregistry_client.register('-'.join([topic, 'key']), key_model.avro_schema())
schema_id = self.register_model(key_model)
elif value_model and issubclass(value_model, AvroModel):
self.schemaregistry_client.register('-'.join([topic, 'value']), value_model.avro_schema())
schema_id = self.register_model(value_model)
else:
raise ValueError("No valid Input Model")
return schema_id
def register_model(self, model: AvroModel) -> int:
""" Register AvroModel to Codec and Schema Registry """
schema_dict = model.avro_schema_to_python()
schema_id = self.schemaregistry_client.register(schema_dict['name'], model.avro_schema())
self.ns_to_id[model._options.namespace] = schema_id
self.id_to_schema[schema_id] = parse_schema(schema_dict)
return schema_id
def _dumps(self, model_dict: Dict[str, Any]) -> bytes:
""" Serialize AvroModel Dict """
# Identify registered model by faust namespace
schema_id = self.ns_to_id.get(model_dict['__faust']['ns'])
if not schema_id:
raise ValueError("Unregistered Model")
with BytesIO() as payload:
payload.write(struct.pack(">bI", MAGIC_BYTE, schema_id))
schemaless_writer(payload, self.id_to_schema[schema_id], model_dict)
return payload.getvalue()
def _loads(self, message: bytes) -> Dict:
""" Deserialize Message via Confluent Schema Id """
if message is None:
return None
if len(message) <= 5:
raise ValueError("message is too small to decode")
with BytesIO(message) as payload:
magic, schema_id = struct.unpack(">bI", payload.read(5))
if magic != MAGIC_BYTE:
raise ValueError("message does not start with magic byte")
writer_schema = self.id_to_schema.get(schema_id)
if not writer_schema:
try:
schema = self.schemaregistry_client.get_by_id(schema_id)
writer_schema = parse_schema(schema.schema)
self.id_to_schema[schema_id] = parse_schema(schema.schema)
except ClientError as e:
raise ValueError(f"unable to fetch schema with id {schema_id}: {e}")
return schemaless_reader(payload, writer_schema)
avro_model_codec = AvroModelCodec(client=SchemaRegistryClient(url=config('KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL')))
codecs.register('avro-model', avro_model_codec)
Hi @Irate-Walrus
I am sorry for the delay. Your feature request makes sense.
Why did I follow the design
one serializer per schema
? BecauseFaust
was design with1 topic --> 1 schema
It appears _loads and _dumps must be synchronous due to Faust compatibility: Yes. I opened a PR a long time ago to make everything
async
. UnfortunatelyFaust
was not maintained any more. I am part offaust-streaming
but I do not have enough time to contribute.
I think that this feature will be helpful, but what I would suggest is:
-
Avro/Json
schemas should be register before hand in the schema server. This is a good practice. - Once that an event is sent, the
schema_id
and theserialization_type
must be included in thekafka headers
. This is also a good practice. - Then, when you receive an event you have to check the
kafka headers
and you will know which schema you have to use in order to deserialize.
If we follow the β¬οΈ steps, you won't need any explicit relationship between Record
and Schema
Hi @marcosschroh,
Thanks for spending the time to get back π
It is a shame that Faust
is no longer maintained, I was using faust-streaming
but eventually moved on when I wanted rpc-like features and async
.
1.
Avro/Json
schemas should be registered before hand in the schema server. This is a good practice.I totally agree, I was thinking of something more like a code-first approach here. Run a cli tool, generate the schemas and push them up to the registry.
2. The
schema_id
and theserialization_type
must be included in thekafka headers
. This is also a good practice.First I've heard of this, although it makes a lot of sense. I assume
Confluent
uses a magic byte askafka headers
are relatively new. I haven't used their products recently, so I am unsure if they now use headers.
you won't need any explicit relationship between
Record
andSchema
I was taking the perspective of something like
FastAPI
where thekafka message
is automatically deserialised and then parsed into the correctAvroModel
class. Not sure this is possible without registering theAvroModel
somehow.
Happy for this issue to be closed if you feel like this is not within the goals/scope of this project π. Much appreciated.
In a pythonic world
, using only the AvroModel
will be enough because all the teams in your organization will use the same models but this is not always the case as teams can use different programming languages to talk to kafka
. In this sense, you need a way to share metadata for events and you do it using kafka headers
.
As you correctly mentioned, Confluent
has its own protocol with the magic byte
, the reason behind is that kafka headers
are relative "new" and because they needed a way to tell their consumers which schema
was used to serialize
the event, they included it in the payload
.
I think that using pre-registered schemas
and using the kafka headers
is the way to go. Even if you use AvroModel
, I will recommend sending the schema-id
in the headers
. I have in the backlog a ticket to add the Meta.schema_id
in dataclasses-avroschemas
that will help on this cases (used mainly as documentation I guess). Also, I think we need a Generic
serializer that will be smart enough to serialize/deserialize Confluent and Non Confluent events
What do you think?
In a
pythonic world
, using only theAvroModel
will be enough because all the teams in your organization will use the same models but this is not always the case as teams can use different programming languages to talk tokafka
. In this sense, you need a way to share metadata for events and you do it usingkafka headers
.
I concur, obviously you will still need to let the serializer know what class it will be serializing to/from but this can be independent of the actual pre-registered schema. It does raise the question of checks as to whether the schema actually matches the class representation of it. Although you could probably offload that to something like pydantic
.
I think that using pre-registered
schemas
and using thekafka headers
is the way to go. Even if you useAvroModel
, I will recommend sending theschema-id
in theheaders
. I have in the backlog a ticket to add theMeta.schema_id
indataclasses-avroschemas
that will help on this cases (used mainly as documentation I guess). Also, I think we need aGeneric
serializer that will be smart enough to serialize/deserialize Confluent and Non Confluent events
For dataclasses-avroschemas
I approached it the same way as you, and added additional information to their Meta
class as a custom solution, but as you said documentation of that would help. A Generic
serializer is a good idea, whether it detects support for kafka headers
or is a simple as a user configuration
.