Support Large Message Size (PIP-37 / "chunking")
Currently the go client does not seem to support PIP-37 which allows for messages to be sent that are larger than the maximum message size by breaking them up on the producer side and re-assembling them in the consumer. This would be a handy feature to have parity with the Java client
Hello, I am very interested in this feature, the following is my plan.
Motivation
Make pulsar go client support chunking to produce and consume big messages.
Modifications
Publish Chunked Messages
The maxMessageSize limited the big message publishing.
https://github.com/apache/pulsar-client-go/blob/0f7041ffa9085197aa888ac33d3288a3ed81c57b/pulsar/producer_partition.go#L427-L436
If the size of message payload is bigger than maxMessageSize, it will be discarded. So it should be split into chunked messages with a size not exceeding the maxMessageSize, and they are sent to the brokers separately. I think the chunk logic can be added in internalSendAsync.
https://github.com/apache/pulsar-client-go/blob/0f7041ffa9085197aa888ac33d3288a3ed81c57b/pulsar/producer_partition.go#L741
Receive Chunked Messages
Pulsar allows multiple Producers to produce messages to the same topic at the same time, which means that the chunks of multiple big messages may be alternately arranged in the topic. And each chunk of the same big message is not necessarily consecutive arrived (but must arrive in order, which is guaranteed by the broker).
So the go client needs a ChunkedMessageCtx to track and buffer the chunked message. The context ChunkedMessageCtx maintains the position of the currently received chunks and accumulates the payload of the chunks that have been received. When all chunks are received, ChunkedMessageCtx returns the accumulated payload to the user, i.e. the full message before the chunking.
All ChunkedMessageCtx need to be maintained in a cache. Due to memory pressure, the number of ChunkedMessageCtx needs to be limited (the default upper limit for Java clients is 100). This cache is essentially a concurrent map with eviction policy (LRU). It can be simply implementted as map + mutex + pending queue or some other more complex one (https://github.com/Gleiphir2769/s-cache).
I think it shoud be modified here.
https://github.com/apache/pulsar-client-go/blob/0f7041ffa9085197aa888ac33d3288a3ed81c57b/pulsar/consumer_partition.go#L553
Some Details
Batching
Currently pulsar go client depends on BatchBuilder to send all messages even batching is closed (each message will cause a flush of the batch in this case).
https://github.com/apache/pulsar-client-go/blob/0f7041ffa9085197aa888ac33d3288a3ed81c57b/pulsar/producer_partition.go#L472-L474
In the Java Client, batch message logic will skip the processing of chunk messages. So we need a single message sending implement independent of BatchBuilder.
Considering the problem of consumer available-permits calculation in shared subscription (issue #10417), batching and chunking cannot be enabled at the same time.
Chunked Message ID
This is related to PIP 107. It's good to take the solution in the new Java Client, which is to implement an ChunkMessageIdImpl that can invoke getFirstChunkMessageId. It will modify the Seek implement which seek the first chunk message id.
https://github.com/apache/pulsar-client-go/blob/0f7041ffa9085197aa888ac33d3288a3ed81c57b/pulsar/consumer_partition.go#L428-L431
https://github.com/apache/pulsar-client-go/blob/0f7041ffa9085197aa888ac33d3288a3ed81c57b/pulsar/consumer_partition.go#L447-L459
Size Calculation
This is related to issue #16196. Message metadate should be updated before computing the chunk size. An the total size should include all bytes other than the metadata and payload, e.g. the 4 bytes checksum field.
Shared Subscription
There are some problems of chunking with shared subscription. issue #16202 supported chunking with Shared subscription. And go client may not need to limit chunking with Shared subscription in ConsumerImpl.
unAckedChunkedMessageIdSequenceMap
Go client doen't support ackTimeout now. So there is no unAckedMessageTracker, it seems not a problem for this feature.