confluent-kafka-python
confluent-kafka-python copied to clipboard
Stop using `MessageFactory().GetPrototype(descriptor)` in ProtobufDeserializer, use message_type instead
Description
The ProtobufDeserializer has got a member called _msg_class, which should point to a google.protobuf.internal.GeneratedProtocolMessageType
The constructor of ProtobufDeserializer receives a GeneratedProtocolMessageType in the message_type argument, but instead of using it it calls: self._msg_class = MessageFactory().GetPrototype(descriptor)
This means that self._msg_class is not the original message type, it's a copy of it created from the descriptor of the message. This causes issues down stream if someone tries to serialize that message. The ProtobufSerializer expect the message to be of the original protobuf class, not the copy:
../../../../venv/lib/python3.8/site-packages/confluent_kafka/schema_registry/protobuf.py:391: in __call__
raise ValueError("message must be of type {} not {}"
E ValueError: message must be of type <class 'xxxx.xxxx.Header'> not <class 'Header'>
How to reproduce
@dataclasses.dataclass
class InMemorySchemaRegistryClient:
"""In memory schema registry, for test"""
schemas: dict = dataclasses.field(default_factory=dict)
def register_schema(self, subject_name, schema) -> int:
try:
return self.schemas[schema].schema_id
except KeyError:
schema_id = len(self.schemas)
self.schemas[schema] = RegisteredSchema(
schema_id=schema_id,
schema=Schema(schema, "PROTOBUF", []),
subject=subject_name,
version=1,
)
return schema_id
def lookup_schema(self, subject_name, schema):
return self.schemas.get(schema, None)
def test_end_to_end_kafka():
message_type = Header # Use any protobuf message here
message = message_type(event="hello")
context = SerializationContext("test-topic-1", "value")
serializer = ckp.ProtobufSerializer(
message_type, InMemorySchemaRegistryClient(), {'use.deprecated.format': False}
)
deserializer = ckp.ProtobufDeserializer(message_type, {'use.deprecated.format': False})
kafka_data = serializer(message, context)
assert isinstance(kafka_data, bytes)
proto_data = message.SerializeToString()
assert kafka_data[-len(proto_data) :] == proto_data
assert len(kafka_data) - len(proto_data) == 6
message_type().ParseFromString(proto_data)
assert isinstance(deserializer._msg_class, GeneratedProtocolMessageType)
assert isinstance(message_type, GeneratedProtocolMessageType)
assert message_type.DESCRIPTOR == deserializer._msg_class.DESCRIPTOR
assert deserializer._msg_class is not message_type, "This is wrong, it should be message_type"
deserializer._msg_class().ParseFromString(proto_data)
message_out = deserializer(kafka_data, None)
assert not isinstance(message_type, Header)
assert message_out == message
with pytest.raises(
ValueError, match=r"message must be of type <class '.*\.Header'> not <class 'Header'>"
):
serializer(message_out, context)
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()): ('1.8.2', 17302016), - [x] Apache Kafka broker version: Not relevant (pure client issue)
- [x] Client configuration:
{...}: {'use.deprecated.format': False} - [x] Operating system: mac os / linux
- [x] Provide client logs (with
'debug': '..'as necessary): not relevant - [x] Provide broker log excerpts: not relevant
- [ ] Critical issue: No
Thanks for reporting this. We are looking into it.
I tried to reproduce the issue and didn't succeed. I have similar understanding as you that this might be because of the latest version of protoc that I used to generate protobuf schema.
I started reviewing your Pull Request - https://github.com/confluentinc/confluent-kafka-python/pull/1407, first thing I would suggest you changing is the destination branch. Please change the destination branch to https://github.com/confluentinc/confluent-kafka-python/tree/serdes instead of Master.
Once you have changed the branch (or created new PR), I will review the PR and give my comments.
@pranavrth, thanks for having a look.
I spent some time trying to figure out where the issue comes from. I don't think it has to do with the protoc version. Instead I think it has to do with how the generated protobuf python source files are packaged/installed. If they are local/in the same project it's fine. But if come from another package (eg installed from pip) it stops working (meaning MessageFactory().GetPrototype(message_type.DESCRIPTOR) is not messsage_type)
Anyway I've updated the PR to point to serdes.
Thanks for updating the PR and debugging the issue more.
I have given PR comments.
Can you give some sample of the protobuf python source file which is not working for you so that I can also reproduce the issue and drill down the exact problem?
Here's an easy example (from googleapis-common-protos==1.53.0).
>>> from google.type.date_pb2 import Date
>>> from google.protobuf.message_factory import MessageFactory
>>> MessageFactory().GetPrototype(Date.DESCRIPTOR) is Date
False
>>> Date
<class 'google.type.date_pb2.Date'>
>>> MessageFactory().GetPrototype(Date.DESCRIPTOR)
<class 'Date'>
I'm yet to get a simple reproducible example with a custom .proto file.
I tested the same with googleapis-common-protos==1.56.* and it worked fine for me. googleapis-common-protos==1.53.0 is very old version and they faced many issues regarding the generated files before 1.56.0 version.
Nevertheless, the PR https://github.com/confluentinc/confluent-kafka-python/pull/1407 is merged and will be available in the upcoming release. For the meantime, use latest version of protoc to compile the files.