confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Stop using `MessageFactory().GetPrototype(descriptor)` in ProtobufDeserializer, use message_type instead

Open 0x26res opened this issue 3 years ago • 1 comments

Description

The ProtobufDeserializer has got a member called _msg_class, which should point to a google.protobuf.internal.GeneratedProtocolMessageType

The constructor of ProtobufDeserializer receives a GeneratedProtocolMessageType in the message_type argument, but instead of using it it calls: self._msg_class = MessageFactory().GetPrototype(descriptor)

This means that self._msg_class is not the original message type, it's a copy of it created from the descriptor of the message. This causes issues down stream if someone tries to serialize that message. The ProtobufSerializer expect the message to be of the original protobuf class, not the copy:

../../../../venv/lib/python3.8/site-packages/confluent_kafka/schema_registry/protobuf.py:391: in __call__
    raise ValueError("message must be of type {} not {}"
E   ValueError: message must be of type <class 'xxxx.xxxx.Header'> not <class 'Header'>

How to reproduce



@dataclasses.dataclass
class InMemorySchemaRegistryClient:
    """In memory schema registry, for test"""

    schemas: dict = dataclasses.field(default_factory=dict)

    def register_schema(self, subject_name, schema) -> int:
        try:
            return self.schemas[schema].schema_id
        except KeyError:
            schema_id = len(self.schemas)
            self.schemas[schema] = RegisteredSchema(
                schema_id=schema_id,
                schema=Schema(schema, "PROTOBUF", []),
                subject=subject_name,
                version=1,
            )
            return schema_id

    def lookup_schema(self, subject_name, schema):
        return self.schemas.get(schema, None)

def test_end_to_end_kafka():
    
    message_type = Header  # Use any protobuf message here
    message = message_type(event="hello")
    context = SerializationContext("test-topic-1", "value")
    serializer = ckp.ProtobufSerializer(
        message_type, InMemorySchemaRegistryClient(), {'use.deprecated.format': False}
    )
    deserializer = ckp.ProtobufDeserializer(message_type, {'use.deprecated.format': False})

    kafka_data = serializer(message, context)
    assert isinstance(kafka_data, bytes)
    proto_data = message.SerializeToString()

    assert kafka_data[-len(proto_data) :] == proto_data
    assert len(kafka_data) - len(proto_data) == 6

    message_type().ParseFromString(proto_data)

    assert isinstance(deserializer._msg_class, GeneratedProtocolMessageType)
    assert isinstance(message_type, GeneratedProtocolMessageType)
    assert message_type.DESCRIPTOR == deserializer._msg_class.DESCRIPTOR
    assert deserializer._msg_class is not message_type, "This is wrong, it should be message_type"

    deserializer._msg_class().ParseFromString(proto_data)
    message_out = deserializer(kafka_data, None)

    assert not isinstance(message_type, Header)
    assert message_out == message

    with pytest.raises(
        ValueError, match=r"message must be of type <class '.*\.Header'> not <class 'Header'>"
    ):
        serializer(message_out, context)


Checklist

Please provide the following information:

  • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): ('1.8.2', 17302016),
  • [x] Apache Kafka broker version: Not relevant (pure client issue)
  • [x] Client configuration: {...}: {'use.deprecated.format': False}
  • [x] Operating system: mac os / linux
  • [x] Provide client logs (with 'debug': '..' as necessary): not relevant
  • [x] Provide broker log excerpts: not relevant
  • [ ] Critical issue: No

0x26res avatar Mar 24 '22 12:03 0x26res

Thanks for reporting this. We are looking into it.

jliunyu avatar Mar 24 '22 21:03 jliunyu

I tried to reproduce the issue and didn't succeed. I have similar understanding as you that this might be because of the latest version of protoc that I used to generate protobuf schema.

I started reviewing your Pull Request - https://github.com/confluentinc/confluent-kafka-python/pull/1407, first thing I would suggest you changing is the destination branch. Please change the destination branch to https://github.com/confluentinc/confluent-kafka-python/tree/serdes instead of Master.

Once you have changed the branch (or created new PR), I will review the PR and give my comments.

pranavrth avatar Aug 25 '22 14:08 pranavrth

@pranavrth, thanks for having a look.

I spent some time trying to figure out where the issue comes from. I don't think it has to do with the protoc version. Instead I think it has to do with how the generated protobuf python source files are packaged/installed. If they are local/in the same project it's fine. But if come from another package (eg installed from pip) it stops working (meaning MessageFactory().GetPrototype(message_type.DESCRIPTOR) is not messsage_type)

Anyway I've updated the PR to point to serdes.

0x26res avatar Aug 25 '22 16:08 0x26res

Thanks for updating the PR and debugging the issue more.

I have given PR comments.

Can you give some sample of the protobuf python source file which is not working for you so that I can also reproduce the issue and drill down the exact problem?

pranavrth avatar Aug 25 '22 18:08 pranavrth

Here's an easy example (from googleapis-common-protos==1.53.0).

>>> from google.type.date_pb2 import Date
>>> from google.protobuf.message_factory import MessageFactory
>>> MessageFactory().GetPrototype(Date.DESCRIPTOR) is Date
False
>>> Date
<class 'google.type.date_pb2.Date'>
>>> MessageFactory().GetPrototype(Date.DESCRIPTOR) 
<class 'Date'>

I'm yet to get a simple reproducible example with a custom .proto file.

0x26res avatar Aug 25 '22 18:08 0x26res

I tested the same with googleapis-common-protos==1.56.* and it worked fine for me. googleapis-common-protos==1.53.0 is very old version and they faced many issues regarding the generated files before 1.56.0 version.

Nevertheless, the PR https://github.com/confluentinc/confluent-kafka-python/pull/1407 is merged and will be available in the upcoming release. For the meantime, use latest version of protoc to compile the files.

pranavrth avatar Sep 08 '22 05:09 pranavrth