faust icon indicating copy to clipboard operation
faust copied to clipboard

Difficulties serializing union types and enums to avro

Open BryceCicada opened this issue 4 years ago • 2 comments

Checklist

  • [x] I have included information about relevant versions
    • faust-streaming==0.6.9 and faust-streaming==0.6.10, haven't tried earlier.
  • [x] I have verified that the issue persists when using the master branch of Faust.
    • Verified against https://github.com/faust-streaming/faust/commit/9385c8166256bc85c5c4b985fab6b749d070ba68

Steps to reproduce

  • Define a model containing a union type, each type of which contains an enum with the same name.
  • Serialize to avro
  • Witness error like:
    fastavro._schema_common.SchemaParseException: redefined named type: foo
    

I think I've reduced things down to the following minimal test case:

import typing

from dataclasses_avroschema.schema_generator import AvroModel
from dataclasses_avroschema import types
import faust
from faust.serializers import codecs
from schema_registry.client.client import SchemaRegistryClient
from schema_registry.serializers.faust_serializer import FaustSerializer


class MyModelA(faust.Record, AvroModel):
  foo: types.Enum = types.Enum(["bar"])


class MyModelB(faust.Record, AvroModel):
  # Duplicate field name here with MyModelA
  foo: types.Enum = types.Enum(["bar"])


class MyModelC(faust.Record, AvroModel):
  my_model_a_or_b: typing.Union[MyModelA, MyModelB]


def main():
  # Registry URL not important for issue.
  schema_registry = SchemaRegistryClient(url='http://localhost:8081')

  model_c_serializer = FaustSerializer(
    schema_registry, 
    "my_model_c_topic-value", 
    MyModelC.avro_schema()
  )

  my_model_c = MyModelC(
    my_model_a_or_b=MyModelA(
      foo='bar', 
    ),
  )

  serialized = my_model_c.dumps(serializer=model_c_serializer)
  print(serialized)

if __name__ == '__main__':
    main()

Expected behavior

Serializes model to avro and prints bytes:

b'\x00\x00\x00\x00\x01\x00\x00'

Actual behavior

Serialization failure because of duplicate field name foo between MyModelA and MyModelB.

Works ok if I rename fields in the models to remove the duplication. But it feels like this shouldn't be needed because the models are independent bits of code that happen to be coupled together by the union type and the avro serializer.

The problem appears to be that the resulting avro schema declares both enum types with the same avro type name inferred from the field name. Indeed it works if I use namespaces:

  • put MyModelA and MyModelB in different namespaces, like:
    class MyModelA(faust.Record, AvroModel):
      foo: types.Enum = types.Enum(["bar"])
    
      class Meta:
          namespace = "com.example.MyModelA"
    
    or,
  • declare a namespace on the enum type directly:
      foo: types.Enum = types.Enum(["bar"], namespace='MyModelA')
    

I wonder if there's a way of defining the enum type name directly. Even if there's not, this took a few hours of work to understand the inner workings of the avro serialization, which makes me think that the faust library might benefit from some informative error logging in this case.

Full traceback

Traceback (most recent call last):
  File "faust_problems.py", line 44, in <module>
    main()
  File "faust_problems.py", line 40, in main
    serialized = my_model_c.dumps(serializer=model_c_serializer)
  File "***/lib/python3.8/site-packages/faust/models/base.py", line 484, in dumps
    return dumps(serializer or self._options.serializer, self.to_representation())
  File "***/python3.8/site-packages/faust/serializers/codecs.py", line 359, in dumps
    return get_codec(codec).dumps(obj) if codec else obj
  File "***/lib/python3.8/site-packages/faust/serializers/codecs.py", line 224, in dumps
    obj = cast(Codec, node)._dumps(obj)
  File "***/lib/python3.8/site-packages/schema_registry/serializers/faust_serializer.py", line 50, in _dumps
    return self.encode_record_with_schema(self.schema_subject, self.schema, payload)  # type: ignore
  File "***/lib/python3.8/site-packages/schema_registry/serializers/message_serializer.py", line 73, in encode_record_with_schema
    schema_id = self.schemaregistry_client.register(subject, avro_schema)
  File "***/lib/python3.8/site-packages/schema_registry/client/client.py", line 252, in register
    avro_schema = AvroSchema(avro_schema)
  File "***/lib/python3.8/site-packages/schema_registry/client/schema.py", line 13, in __init__
    self.schema = fastavro.parse_schema(schema, _force=True)
  File "fastavro/_schema.pyx", line 91, in fastavro._schema.parse_schema
  File "fastavro/_schema.pyx", line 245, in fastavro._schema._parse_schema
  File "fastavro/_schema.pyx", line 290, in fastavro._schema.parse_field
  File "fastavro/_schema.pyx", line 115, in fastavro._schema._parse_schema
  File "fastavro/_schema.pyx", line 245, in fastavro._schema._parse_schema
  File "fastavro/_schema.pyx", line 290, in fastavro._schema.parse_field
  File "fastavro/_schema.pyx", line 212, in fastavro._schema._parse_schema
fastavro._schema_common.SchemaParseException: redefined named type: foo

Versions

  • Python version 3.8.5
  • Faust version 0.6.10
  • Operating system Ubuntu 18.04
  • Kafka version N/A
  • RocksDB version (if applicable) N/A

BryceCicada avatar Nov 01 '21 15:11 BryceCicada

@marcosschroh FYI

patkivikram avatar Nov 04 '21 14:11 patkivikram

I should also note that we don't support Unions in Faust Records, yet. We're currently working on that in https://github.com/faust-streaming/faust/issues/429.

wbarnha avatar Jan 13 '23 05:01 wbarnha