confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Schema references support

Open slominskir opened this issue 3 years ago • 10 comments

Fixes https://github.com/confluentinc/confluent-kafka-python/issues/974

To use Serializer:

named_schemas = {}
ref_dict = loads(ref_schema_str)
parse_schema(ref_dict, _named_schemas=named_schemas)
ref_schema = SchemaReference("org.test.MyReferencedItem", "referenced-subject", "1")
schema = Schema(schema_str, "AVRO", [ref_schema])

avro_serializer = AvroSerializer(schema_registry_client, schema, None, None, named_schemas)

To use Deserializer:

named_schemas = {}
ref_dict = loads(ref_schema_str)
parse_schema(ref_dict, _named_schemas=named_schemas)

# schema can be None if using registry (writer schema)
ref_schema = SchemaReference("org.test.MyReferencedItem", "referenced-subject", "1")
schema = Schema(schema_str, "AVRO", [ref_schema])

avro_deserializer = AvroDeserializer(schema_registry_client, schema, None, False, named_schemas)

slominskir avatar Apr 14 '21 22:04 slominskir

The failing test is only with Python 2.7 and is due to fastavro API change. The 2.7 version of the tests use fastavro 0.24.2 whereas in the 3.6 version of the tests fastavro 1.4.0 is used. The issue is the signature of the method parse_schemas changed and the parameter _named_schemas was renamed named_schemas. How is this kind of difference usually handled?

slominskir avatar May 10 '21 16:05 slominskir

Note: fastavro don't support Python 2.7 anymore - the Python community doesn't either, and neither should this project: https://www.python.org/doc/sunset-python-2/. Users who haven't updated by now to 3.x have had over a decade to do so. Code that supports 2.7 still is no longer a good thing - it's likely a sign of a horror show of code underneath attempting to satisfy multiple APIs simultaneously and also a sign of an API failing to use modern advances.

slominskir avatar May 10 '21 17:05 slominskir

Can we also have a single generic schema object? or make this code to use both the schema object Confluent-kafka-python SchemaRegistryClient method have different schema object for different method for example: get_latest_version return you schema with references as array of json as below

[{'name': 'com.test.common.event.Eventdata', 'subject': 'com.test.common.event.Eventdata', 'version': 5}, {'name': 'com.test.schema.user.Details', 'subject': 'com.test.schema.user.Details', 'version': 2}]

While get_schema method return you schema with references as array of SchemaReference object as below

[<confluent_kafka.schema_registry.schema_registry_client.SchemaReference object at 0x1037a9790>, <confluent_kafka.schema_registry.schema_registry_client.SchemaReference object at 0x1037a97f0>]

As it is more like to fetch schema with subject name rather than schema_id because we can map our schema with topic and reuse topic name to fetch the schema.

Currently I get below error the is when I pass schema object where references are array of json object i.e get_latest_version :

Traceback (most recent call last):
  File "/Users/lib/python3.9/site-packages/confluent_kafka/serializing_producer.py", line 172, in produce
    value = self._value_serializer(value, ctx)
  File "/Users/lib/python3.9/site-packages/confluent_kafka/schema_registry/avro.py", line 219, in __call__
    registered_schema = self._registry.lookup_schema(subject,
  File "/Userslib/python3.9/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 407, in lookup_schema
    request['references'] = [{'name': ref.name,
  File "/Users/lib/python3.9/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 407, in <listcomp>
    request['references'] = [{'name': ref.name,
AttributeError: 'dict' object has no attribute 'name'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/a/python-avro-producer/arvo_producer.py", line 70, in <module>
    main()
  File "/Users/a/python-avro-producer/arvo_producer.py", line 64, in main
    producer.produce(topic=topic, value=value, on_delivery=delivery_report)
  File "/Users/lib/python3.9/site-packages/confluent_kafka/serializing_producer.py", line 174, in produce
    raise ValueSerializationError(se)
confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="'dict' object has no attribute 'name'"}

avanish-appdirect avatar Oct 06 '21 09:10 avanish-appdirect

Any update on the progress for this PR?

psheets avatar Feb 02 '22 13:02 psheets

ok, i'm going to try to facilitate getting this PR in between other things...

@slominskir - two things: 1. can you resolve the conflicts :-). 2. the suggested change to the AvroSerializer constructor is breaking. can you make it so that a string or Schema is accepted? (do you consider that idiomatic Python?)

there may be other things, that is just what comes to mind first.

mhowlett avatar Mar 23 '22 19:03 mhowlett

@mhowlett - I'll take a look at this again. Accepting either a string or a Schema sounds reasonable (cleanest way may be via @overload annotation https://peps.python.org/pep-0484/#function-method-overloading)

slominskir avatar Mar 23 '22 20:03 slominskir

cool, thanks. I'll give it a careful look over / play after you've done that. i may not get to this immediately, but please feel free to @ me if it's taking more than a couple of weeks ...

mhowlett avatar Mar 23 '22 20:03 mhowlett

CLA assistant check
All committers have signed the CLA.

CLAassistant avatar Mar 31 '22 00:03 CLAassistant

@mhowlett - The CI doesn't seem to be working, but otherwise this PR is ready for first pass inspection. A few points to consider:

  1. I tried to keep changes to a minimum and therefore users must manually register any references with the Schema Registry, parse schemas to populate named_schemas, and create a references array with all references (in order) and potentially include nested references too. Changes to the code will likely balloon if we try to handle this in the serializer as we may determine registry caching would need to be updated to capture subject and version alternative schema lookups and/or determine Schema/SchemaReference/RegisteredSchema objects need to be re-worked.
  2. In the deserializer case when the writer schema is fetched from the registry the code currently will resolve only one-level deep of associated references. (recursive code probably should be handled carefully).
  3. Not super excited about the @overload approach to supporting either a string or Schema. Not sure if there is a better way though. A factory method could be used, but that probably makes most sense for string case allowing the constructor to take the Schema. Pulling in a module that provides multiple dispatch seems like overkill. Using a Union might be simplest.

slominskir avatar Mar 31 '22 23:03 slominskir

Worth mentioning that without https://github.com/confluentinc/confluent-kafka-python/pull/1304 we can't do recursive named_schemas resolution of nested references (another reason this PR only goes one deep). We may eventually want something like:

    def __parse_schema_recursive(self, schema, named_schemas):
        print(f"Handling schema {schema.schema_str}")
        for ref in schema.references:
            print(f"Handling reference: {ref.name}")
            ref_reg_schema = self._registry.get_version(ref.subject, ref.version)
            self.__parse_schema_recursive(ref_reg_schema.schema, named_schemas)
            ref_dict = loads(ref_reg_schema.schema.schema_str)
            parse_schema(ref_dict, named_schemas=named_schemas)

But first SchemaRegistryClient.get_version() must return a RegisteredSchema containing a Schema with a non-empty references array.

slominskir avatar Apr 25 '22 21:04 slominskir

Need this so bad.....schema reference is failing during parse_schema on my end.

jensenity avatar Dec 08 '22 07:12 jensenity

Tests passing in the PR - https://github.com/confluentinc/confluent-kafka-python/pull/1542

anchitj avatar Apr 05 '23 12:04 anchitj