pulsar
pulsar copied to clipboard
[improve][client c++] Support KeyValue Schema.
Motivation
C++ client Support KeyValue Schema. For key and value schema, just only support AVRO and JSON type(consistent with java client).
Modifications
- A new constructor is added in
SchemaInfoto combine key and value schemas. - Add a new
KeyValueclass, to help users merge and parse key and value data.
Documentation
The current Schema documentation is relatively simple, I will open a new PR to add more content and sample code
- [x]
doc-required(Your PR needs to update docs and you will update later)
Matching PR in forked repository
https://github.com/shibd/pulsar/pull/14
@RobertIndie @BewareMyPower Thanks for the review, all suggestions have been fixed, PTAL.
/pulsarbot run-failure-checks
/pulsarbot run-failure-checks
/pulsarbot run-failure-checks
@BewareMyPower @RobertIndie Can you look at it again? Thanks.
Okay, I will review it soon.
I just thought again for the design of
KeyValue. Currently, for a givenMessageobject,
- the key is stored in the
MessageMetadataobject (generated by ProtoBuf)- the value is stored in
buffer, whose type isSharedBufferIf the encoding type is
SEPARATED, we can avoid the copy operation by just return the key and value to users.Otherwise, i.e. the encoding type is
INLINE, the data copy cannot be avoided, because both key and value are stored inbuffer, we must split it into two byte arrays (std::stringobjects). But in this case, we can still expose the raw pointer of the value part.For example, if the buffer is
0x00 0x00 0x00 0x01 0x61 0x00 0x00 0x00 0x01 0x62The key is "a" (ASCII is 61), the value is "b" (ASCII is 62). Assuming the pointer to the first byte is
p, we can returnp + 9as the pointer to the value and 1 as the length of the value.Regarding the key, usually we assume the key is not too large, so the data copy of the key is accepted.
In addition, we don't need to expose the key value encoding type in
KeyValuebecause we already store it in the schema.enum class KeyValueEncodingType { SEPARATED, INLINE }; class KeyValueImpl; class PULSAR_PUBLIC KeyValue { public: // Use move constructor to avoid data copy KeyValue(std::string&& key, std::string&& value); KeyValue(const Message& message); const std::string& getKey() const; const void* getData() const; size_t getLength() const; std::string getDataAsString() const; private: std::shared_ptr<KeyValueImpl> impl_; };The
KeyValueImplclass should store the following two fields.std::string key_; SharedBuffer valueBuffer_;We should encode the
key_andvalueBuffer_to the actual buffer inProducerImpl::sendAsyncbecause we can get the encoding type from the schema.
@BewareMyPower Thanks for your review. I took your suggestion to reduce the copying of data by modifying the API.
In addition, we don't need to expose the key value encoding type in KeyValue because we already store it in the schema.
I keep encoding type in KeyValue. Because if will remove it, will affect the API.
For example, when a user produces a message.
// originally
Message msg = MessageBuilder().setContent(keyValue).build(); // originally
// after modification
Message msg = producer.newMessageBuilder().setContent(keyValue).build();
// or
Message msg = MessageBuilder(schema).setContent(keyValue).build();
I keep encoding type in KeyValue. Because if will remove it, will affect the API.
The encoding type is already included in the schema, which is set in the producer or consumer internally from ProducerConfiguration or ConsumerConfiguration, why removing it will affect the API? We don't need to set schema on a MessageBuilder.
I keep encoding type in KeyValue. Because if will remove it, will affect the API.
The encoding type is already included in the schema, which is set in the producer or consumer internally from
ProducerConfigurationorConsumerConfiguration, why removing it will affect the API? We don't need to set schema on aMessageBuilder.
@BewareMyPower I thought of a way, but it's not very elegant.
I add method convert PayloadToKeyValue and convertKeyValueToPayload on MessageImpl class. When the consumer receives a message after, will invoke convert PayloadToKeyValue to convert payload to KeyValue, When the producer sends a message before, will invoke convert KeyValueToPayload to convert KeyValue to the payload. PTAL.
/pulsarbot run-failure-checks
Hi @shibd, I've completed my review except some details of tests, PTAL.
Hi @shibd, I've completed my review except some details of tests, PTAL.
@BewareMyPower Thanks for your review, I fixed it, PTAL.
Resubmit the PR in pulsar-client-cpp repository: https://github.com/apache/pulsar-client-cpp/pull/22