pulsar-client-go
pulsar-client-go copied to clipboard
[feature] Message payload processor PIP-96
Hi Team,
This PR introduces message payload processor feature to Golang client. (PIP-96)
- added support for custom payload processor
- added default payload processor
- added a byte slice reader to process messages similar to Java Client
(In Java, the methods for reading messages are implemented as part of MessagePayloadContext https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadContext.java#L65-#L79 In this PR, I used MessagePayloadContext just to carry the information and separated the above methods into a reader.)
- added tests for pulsar message format and custom message format similar to java implementation
Fixes #962
Motivation
As per PIP-96 proposal, it adds capabilities to process messages by the client using pluggable message processor. Same functionalities are implemented for Golang client.
Modifications
-
In Java Client implementation, checksum is only validated if the first bytes matches with magic number. However, the current Golang client always try to verify the checksum and return errors if checksum is not present. Due to this reasons, If we KoP and Kafka entry.format, We'll get errors when parsing MessageMeta. To avoid that, Checksum validation was separated from reading messages (We can log precise errors) and implemented similar logic we have in Java Client.
-
A few unexposed struct and APIs were exposed to the client. Changed the visibility as the client needs to use them when processing messages.
Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change added tests and can be verified as follows:
- added tests with Default Payload processor. It'll process pulsar messages
- added tests with custom payload processor for different entry format.
Does this pull request potentially affect one of the following parts:
If yes
was chosen, please highlight the changes
- Dependencies (does it add or upgrade a dependency): (no)
- The public API: (yes, not breaking changes)
- New field in ConsumerOptions (MessagePayloadProcessor)
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Exclusive,
MessagePayloadProcessor: DefaultPayloadProcessor{},
})
- previously
message
visibility was limited. It has changed
Old
type message struct {
New
type MessageImpl struct {
publishTime time.Time
- The schema: (no )
- The default values of configurations: (no)
- The wire protocol: (no)
Documentation
This PR adds new feature. I have added Godoc comments in code. If additional docs/separate PR required, I could contribute.