pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[improve][client c++] Support KeyValue Schema.

Open shibd opened this issue 3 years ago • 11 comments

Motivation

C++ client Support KeyValue Schema. For key and value schema, just only support AVRO and JSON type(consistent with java client).

Modifications

  • A new constructor is added in SchemaInfo to combine key and value schemas.
  • Add a new KeyValue class, to help users merge and parse key and value data.

Documentation

The current Schema documentation is relatively simple, I will open a new PR to add more content and sample code

  • [x] doc-required (Your PR needs to update docs and you will update later)

Matching PR in forked repository

https://github.com/shibd/pulsar/pull/14

shibd avatar Aug 17 '22 01:08 shibd

@RobertIndie @BewareMyPower Thanks for the review, all suggestions have been fixed, PTAL.

shibd avatar Aug 17 '22 08:08 shibd

/pulsarbot run-failure-checks

shibd avatar Aug 20 '22 06:08 shibd

/pulsarbot run-failure-checks

shibd avatar Aug 21 '22 13:08 shibd

/pulsarbot run-failure-checks

shibd avatar Aug 25 '22 07:08 shibd

@BewareMyPower @RobertIndie Can you look at it again? Thanks.

shibd avatar Aug 25 '22 07:08 shibd

Okay, I will review it soon.

BewareMyPower avatar Sep 01 '22 16:09 BewareMyPower

I just thought again for the design of KeyValue. Currently, for a given Message object,

  • the key is stored in the MessageMetadata object (generated by ProtoBuf)
  • the value is stored in buffer, whose type is SharedBuffer

If the encoding type is SEPARATED, we can avoid the copy operation by just return the key and value to users.

Otherwise, i.e. the encoding type is INLINE, the data copy cannot be avoided, because both key and value are stored in buffer, we must split it into two byte arrays (std::string objects). But in this case, we can still expose the raw pointer of the value part.

For example, if the buffer is

0x00 0x00 0x00 0x01 0x61 0x00 0x00 0x00 0x01 0x62

The key is "a" (ASCII is 61), the value is "b" (ASCII is 62). Assuming the pointer to the first byte is p, we can return p + 9 as the pointer to the value and 1 as the length of the value.

Regarding the key, usually we assume the key is not too large, so the data copy of the key is accepted.

In addition, we don't need to expose the key value encoding type in KeyValue because we already store it in the schema.

enum class KeyValueEncodingType
{
    SEPARATED,

    INLINE
};

class KeyValueImpl;

class PULSAR_PUBLIC KeyValue {
   public:
    // Use move constructor to avoid data copy
    KeyValue(std::string&& key, std::string&& value);
    KeyValue(const Message& message);

    const std::string& getKey() const;

    const void* getData() const;

    size_t getLength() const;

    std::string getDataAsString() const;

   private:
    std::shared_ptr<KeyValueImpl> impl_;
};

The KeyValueImpl class should store the following two fields.

    std::string key_;
    SharedBuffer valueBuffer_;

We should encode the key_ and valueBuffer_ to the actual buffer in ProducerImpl::sendAsync because we can get the encoding type from the schema.

@BewareMyPower Thanks for your review. I took your suggestion to reduce the copying of data by modifying the API.

In addition, we don't need to expose the key value encoding type in KeyValue because we already store it in the schema.

I keep encoding type in KeyValue. Because if will remove it, will affect the API.

For example, when a user produces a message.

    // originally
    Message msg = MessageBuilder().setContent(keyValue).build();   // originally

    // after modification
    Message msg = producer.newMessageBuilder().setContent(keyValue).build();  
    // or
    Message msg = MessageBuilder(schema).setContent(keyValue).build();  

shibd avatar Sep 14 '22 02:09 shibd

I keep encoding type in KeyValue. Because if will remove it, will affect the API.

The encoding type is already included in the schema, which is set in the producer or consumer internally from ProducerConfiguration or ConsumerConfiguration, why removing it will affect the API? We don't need to set schema on a MessageBuilder.

BewareMyPower avatar Sep 14 '22 03:09 BewareMyPower

I keep encoding type in KeyValue. Because if will remove it, will affect the API.

The encoding type is already included in the schema, which is set in the producer or consumer internally from ProducerConfiguration or ConsumerConfiguration, why removing it will affect the API? We don't need to set schema on a MessageBuilder.

@BewareMyPower I thought of a way, but it's not very elegant.

I add method convert PayloadToKeyValue and convertKeyValueToPayload on MessageImpl class. When the consumer receives a message after, will invoke convert PayloadToKeyValue to convert payload to KeyValue, When the producer sends a message before, will invoke convert KeyValueToPayload to convert KeyValue to the payload. PTAL.

shibd avatar Sep 14 '22 14:09 shibd

/pulsarbot run-failure-checks

shibd avatar Sep 15 '22 01:09 shibd

Hi @shibd, I've completed my review except some details of tests, PTAL.

BewareMyPower avatar Sep 21 '22 07:09 BewareMyPower

Hi @shibd, I've completed my review except some details of tests, PTAL.

@BewareMyPower Thanks for your review, I fixed it, PTAL.

shibd avatar Sep 22 '22 07:09 shibd

Resubmit the PR in pulsar-client-cpp repository: https://github.com/apache/pulsar-client-cpp/pull/22

shibd avatar Oct 07 '22 13:10 shibd