pulsar-client-go icon indicating copy to clipboard operation
pulsar-client-go copied to clipboard

[feature] Message payload processor PIP-96

Open ksankeerth opened this issue 1 year ago • 2 comments

Hi Team,

This PR introduces message payload processor feature to Golang client. (PIP-96)

  • added support for custom payload processor
  • added default payload processor
  • added a byte slice reader to process messages similar to Java Client

(In Java, the methods for reading messages are implemented as part of MessagePayloadContext https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadContext.java#L65-#L79 In this PR, I used MessagePayloadContext just to carry the information and separated the above methods into a reader.)

  • added tests for pulsar message format and custom message format similar to java implementation

Fixes #962

Motivation

As per PIP-96 proposal, it adds capabilities to process messages by the client using pluggable message processor. Same functionalities are implemented for Golang client.

Modifications

  • In Java Client implementation, checksum is only validated if the first bytes matches with magic number. However, the current Golang client always try to verify the checksum and return errors if checksum is not present. Due to this reasons, If we KoP and Kafka entry.format, We'll get errors when parsing MessageMeta. To avoid that, Checksum validation was separated from reading messages (We can log precise errors) and implemented similar logic we have in Java Client.

  • A few unexposed struct and APIs were exposed to the client. Changed the visibility as the client needs to use them when processing messages.

Verifying this change

  • [ ] Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • added tests with Default Payload processor. It'll process pulsar messages
  • added tests with custom payload processor for different entry format.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (yes, not breaking changes)
  1. New field in ConsumerOptions (MessagePayloadProcessor)
	consumer, err := client.Subscribe(ConsumerOptions{
		Topic:                   topic,
		SubscriptionName:        "my-sub",
		Type:                    Exclusive,
		MessagePayloadProcessor: DefaultPayloadProcessor{},
	})
  1. previously message visibility was limited. It has changed

Old

type message struct {

New

type MessageImpl struct {
	publishTime         time.Time
  • The schema: (no )
  • The default values of configurations: (no)
  • The wire protocol: (no)

Documentation

This PR adds new feature. I have added Godoc comments in code. If additional docs/separate PR required, I could contribute.

ksankeerth avatar Apr 17 '23 13:04 ksankeerth