Difficulties serializing union types and enums to avro
Checklist
- [x] I have included information about relevant versions
faust-streaming==0.6.9andfaust-streaming==0.6.10, haven't tried earlier.
- [x] I have verified that the issue persists when using the
masterbranch of Faust.- Verified against https://github.com/faust-streaming/faust/commit/9385c8166256bc85c5c4b985fab6b749d070ba68
Steps to reproduce
- Define a model containing a union type, each type of which contains an enum with the same name.
- Serialize to avro
- Witness error like:
fastavro._schema_common.SchemaParseException: redefined named type: foo
I think I've reduced things down to the following minimal test case:
import typing
from dataclasses_avroschema.schema_generator import AvroModel
from dataclasses_avroschema import types
import faust
from faust.serializers import codecs
from schema_registry.client.client import SchemaRegistryClient
from schema_registry.serializers.faust_serializer import FaustSerializer
class MyModelA(faust.Record, AvroModel):
foo: types.Enum = types.Enum(["bar"])
class MyModelB(faust.Record, AvroModel):
# Duplicate field name here with MyModelA
foo: types.Enum = types.Enum(["bar"])
class MyModelC(faust.Record, AvroModel):
my_model_a_or_b: typing.Union[MyModelA, MyModelB]
def main():
# Registry URL not important for issue.
schema_registry = SchemaRegistryClient(url='http://localhost:8081')
model_c_serializer = FaustSerializer(
schema_registry,
"my_model_c_topic-value",
MyModelC.avro_schema()
)
my_model_c = MyModelC(
my_model_a_or_b=MyModelA(
foo='bar',
),
)
serialized = my_model_c.dumps(serializer=model_c_serializer)
print(serialized)
if __name__ == '__main__':
main()
Expected behavior
Serializes model to avro and prints bytes:
b'\x00\x00\x00\x00\x01\x00\x00'
Actual behavior
Serialization failure because of duplicate field name foo between MyModelA and MyModelB.
Works ok if I rename fields in the models to remove the duplication. But it feels like this shouldn't be needed because the models are independent bits of code that happen to be coupled together by the union type and the avro serializer.
The problem appears to be that the resulting avro schema declares both enum types with the same avro type name inferred from the field name. Indeed it works if I use namespaces:
- put
MyModelAandMyModelBin different namespaces, like:
or,class MyModelA(faust.Record, AvroModel): foo: types.Enum = types.Enum(["bar"]) class Meta: namespace = "com.example.MyModelA" - declare a namespace on the enum type directly:
foo: types.Enum = types.Enum(["bar"], namespace='MyModelA')
I wonder if there's a way of defining the enum type name directly. Even if there's not, this took a few hours of work to understand the inner workings of the avro serialization, which makes me think that the faust library might benefit from some informative error logging in this case.
Full traceback
Traceback (most recent call last):
File "faust_problems.py", line 44, in <module>
main()
File "faust_problems.py", line 40, in main
serialized = my_model_c.dumps(serializer=model_c_serializer)
File "***/lib/python3.8/site-packages/faust/models/base.py", line 484, in dumps
return dumps(serializer or self._options.serializer, self.to_representation())
File "***/python3.8/site-packages/faust/serializers/codecs.py", line 359, in dumps
return get_codec(codec).dumps(obj) if codec else obj
File "***/lib/python3.8/site-packages/faust/serializers/codecs.py", line 224, in dumps
obj = cast(Codec, node)._dumps(obj)
File "***/lib/python3.8/site-packages/schema_registry/serializers/faust_serializer.py", line 50, in _dumps
return self.encode_record_with_schema(self.schema_subject, self.schema, payload) # type: ignore
File "***/lib/python3.8/site-packages/schema_registry/serializers/message_serializer.py", line 73, in encode_record_with_schema
schema_id = self.schemaregistry_client.register(subject, avro_schema)
File "***/lib/python3.8/site-packages/schema_registry/client/client.py", line 252, in register
avro_schema = AvroSchema(avro_schema)
File "***/lib/python3.8/site-packages/schema_registry/client/schema.py", line 13, in __init__
self.schema = fastavro.parse_schema(schema, _force=True)
File "fastavro/_schema.pyx", line 91, in fastavro._schema.parse_schema
File "fastavro/_schema.pyx", line 245, in fastavro._schema._parse_schema
File "fastavro/_schema.pyx", line 290, in fastavro._schema.parse_field
File "fastavro/_schema.pyx", line 115, in fastavro._schema._parse_schema
File "fastavro/_schema.pyx", line 245, in fastavro._schema._parse_schema
File "fastavro/_schema.pyx", line 290, in fastavro._schema.parse_field
File "fastavro/_schema.pyx", line 212, in fastavro._schema._parse_schema
fastavro._schema_common.SchemaParseException: redefined named type: foo
Versions
- Python version 3.8.5
- Faust version 0.6.10
- Operating system Ubuntu 18.04
- Kafka version N/A
- RocksDB version (if applicable) N/A
@marcosschroh FYI
I should also note that we don't support Unions in Faust Records, yet. We're currently working on that in https://github.com/faust-streaming/faust/issues/429.