confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

schema registry protobuf serialization exception after upgrading to 2.7.0

Open jbouricius opened this issue 10 months ago • 7 comments

Running version 2.6.1 of the library, this is approximately my code:

class FooSerializer:

    def __init__(self, topic: str, schema_registry: ConfluentSchemaRegistryClient):
        self._topic = topic
        self._protobuf_serializer = ProtobufSerializer(
            Foo,
            schema_registry,
            conf={
                "auto.register.schemas": False,
                "use.deprecated.format": False,
                "use.latest.version": True,
            },
        )

    def serialize(self, foo: Foo) -> bytearray:
        return self._protobuf_serializer(foo, SerializationContext(self._topic, MessageField.VALUE))

where Foo is some proto class already registered as the schema for the topic. This works exactly as I expect.

After upgrading to 2.7.x or 2.8.0, the same exact code hits an exception ending in this traceback:

    return self._protobuf_serializer(foo, SerializationContext(self._topic, MessageField.VALUE))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/testrunner/.local/lib/python3.11/site-packages/confluent_kafka/schema_registry/protobuf.py", line 582, in __call__
    fd_proto, pool = self._get_parsed_schema(latest_schema.schema)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/testrunner/.local/lib/python3.11/site-packages/confluent_kafka/schema_registry/protobuf.py", line 610, in _get_parsed_schema
    _resolve_named_schema(schema, self._registry, pool)
  File "/home/testrunner/.local/lib/python3.11/site-packages/confluent_kafka/schema_registry/protobuf.py", line 207, in _resolve_named_schema
    _resolve_named_schema(referenced_schema.schema, schema_registry_client, pool, visited)
  File "/home/testrunner/.local/lib/python3.11/site-packages/confluent_kafka/schema_registry/protobuf.py", line 208, in _resolve_named_schema
    file_descriptor_proto = _str_to_proto(ref.name, referenced_schema.schema.schema_str)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/testrunner/.local/lib/python3.11/site-packages/confluent_kafka/schema_registry/protobuf.py", line 182, in _str_to_proto
    raise SerializationError(str(e))
confluent_kafka.serialization.SerializationError: Error parsing message

I didn't see anything in the changelogs that indicated there was a breaking change here. Am I missing a change or is there some other problem? Any other information I can provide?

jbouricius avatar Jan 16 '25 18:01 jbouricius

Hi, I have the same problem, to temporary solve it I fixed the version 2.6.1 in the project.

franciellyferreira avatar Jan 22 '25 18:01 franciellyferreira

For Protobuf, this library needs changes from https://github.com/confluentinc/schema-registry/pull/3276, which will be in the next CP patch releases.

rayokota avatar Feb 07 '25 21:02 rayokota

I believe this is fixed by https://github.com/confluentinc/confluent-kafka-python/pull/1948

rayokota avatar Mar 24 '25 16:03 rayokota

I'm still seeing the error, even after upgrading to 2.9.0. The Traceback is slightly more informative:

../../../../../../../.venv/lib/python3.11/site-packages/confluent_kafka/schema_registry/protobuf.py:593: in __call__
    fd_proto, pool = self._get_parsed_schema(latest_schema.schema)
../../../../../../../.venv/lib/python3.11/site-packages/confluent_kafka/schema_registry/protobuf.py:621: in _get_parsed_schema
    _resolve_named_schema(schema, self._registry, pool)
../../../../../../../.venv/lib/python3.11/site-packages/confluent_kafka/schema_registry/protobuf.py:208: in _resolve_named_schema
    _resolve_named_schema(referenced_schema.schema, schema_registry_client, pool, visited)
../../../../../../../.venv/lib/python3.11/site-packages/confluent_kafka/schema_registry/protobuf.py:209: in _resolve_named_schema
    file_descriptor_proto = _str_to_proto(ref.name, referenced_schema.schema.schema_str)

and ends in

        serialized_pb = base64.standard_b64decode(schema_str.encode('ascii'))
        file_descriptor_proto = descriptor_pb2.FileDescriptorProto()
        try:
            file_descriptor_proto.ParseFromString(serialized_pb)
            file_descriptor_proto.name = name
        except DecodeError as e:
>           raise SerializationError(str(e))
E           confluent_kafka.serialization.SerializationError: Error parsing message with type 'google.protobuf.FileDescriptorProto'

jbouricius avatar Mar 31 '25 17:03 jbouricius

@jbouricius , which version of CP are you using?

rayokota avatar Apr 01 '25 20:04 rayokota

@rayokota

Our Schema Registry is running Confluent Platform 7.5.1

jbouricius avatar Apr 02 '25 16:04 jbouricius

@jbouricius , you need to use one of the latest CP releases, such as CP 7.5.8

rayokota avatar Apr 02 '25 16:04 rayokota

Closing as CP version requirement + fix is specified in the ticket.

MSeal avatar Jul 24 '25 23:07 MSeal