Unexpected chunk messages are not redelivered
Expected behavior
When chunking is enabled, a golang consumer client code sometimes remove chunked msg context from it data through the below logic:
if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID != ctx.lastChunkedMsgID+1 {
lastChunkedMsgID := -1
totalChunks := -1
if ctx != nil {
lastChunkedMsgID = int(ctx.lastChunkedMsgID)
totalChunks = int(ctx.totalChunks)
ctx.chunkedMsgBuffer.Clear()
}
pc.log.Warnf(fmt.Sprintf(
"Received unexpected chunk messageId %s, last-chunk-id %d, chunkId = %d, total-chunks %d",
msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
pc.chunkedMsgCtxMap.remove(uuid)
pc.availablePermits.inc()
return nil
}
If pulsar-client-go is removing such chunked message ctx's then there should be some information back to the broker for re-attempting this message again.
Actual behavior
When a consumer client discards this message, broker never retries to send the same message again and the msgbacklog ouput (through cli)
./pulsar-admin topics partitioned-stats persistent://dummy/my_topic/my_namespace | grep msgBacklog being non-zero. Sometimes 5, 10 or even 1 message
Steps to reproduce
Enable producer and consumer side chunking with the following configs
Backend Consumer config
consumer:
topic: "my_topic"
namespace: "my_namespace"
subscriptionName: "my_subscription"
subscriptionType: "key_shared"
receiverQueueSize: 1000
enableAutoScaledReceiverQueueSizeLimit: "true"
nackRedeliveryDelay: 30s
maxPendingChunkMessages: 100
expiryTimeOfIncompleteChunkSec: 120
autoAckIncompleteChunkEnabled: "false"
Backend Producer config
producer:
topic: "my_topic"
pulsarClientIsolationEnabled: false
enabledChunking: "true"
compressionType: "lz4"
compressionLevel: "faster"
sendTimeout: 60s
disableBlockIfQueFull: true
maxPendingMessages: 1000
hashingScheme: "murmur3_32_hash"
sendAsync: true
Broker configuration
broker:
image: apachepulsar/pulsar:latest
container_name: broker
hostname: broker
restart: no
environment:
- metadataStoreUrl=zk:zookeeper:2181
- zookeeperServers=zookeeper:2181
- clusterName=cluster-a
- managedLedgerDefaultEnsembleSize=1
- managedLedgerDefaultWriteQuorum=1
- managedLedgerDefaultAckQuorum=1
- advertisedAddress=broker
- advertisedListeners=external:pulsar://broker:6650
- PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=2048m
- maxMessageSize=1242880
depends_on:
zookeeper:
condition: service_healthy
bookie:
condition: service_started
ports:
- "6650:6650"
- "8080:8080"
command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:8080/admin/v2/clusters" ]
interval: 5s
timeout: 10s
retries: 5
Ingestion payload size is 15MB.
Send this request for 300 seconds and the unexpected chunk message will come within 10-15 seconds on local machine.
Note: Currently, we are not hitting go c.discardChunkIfExpire(uuid, true, c.pc.options.expireTimeOfIncompleteChunk) and go c.discardOldestChunkMessage(c.pc.options.autoAckIncompleteChunk) condition for discard. The potential problem lies over there also as no such mechanism for broker to retry is made.
System configuration
Pulsar version: 3.3.2 for broker and 0.15.1 for pulsar-client-go
Can, we get some help here, as majority of our pipelines are currently impacted with this flow.
Hi,
I think the problem is that with chunking we should be allowing out of order deliveries of the chunk messages. Current, implementation is very strict on the ordering which cannot be guaranteed in the distributed systems.
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 61c82bf..f8de2ac 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -179,7 +179,7 @@ type partitionConsumer struct {
dlq *dlqRouter
log log.Logger
- compressionProviders sync.Map //map[pb.CompressionType]compression.Provider
+ compressionProviders sync.Map // map[pb.CompressionType]compression.Provider
metrics *internal.LeveledMetrics
decryptor cryptointernal.Decryptor
schemaInfoCache *schemaInfoCache
@@ -1449,39 +1449,25 @@ func (pc *partitionConsumer) processMessageChunk(compressedPayload internal.Buff
partitionIdx: pc.partitionIdx,
}
- if msgMeta.GetChunkId() == 0 {
- pc.chunkedMsgCtxMap.addIfAbsent(uuid,
- numChunks,
- totalChunksSize,
- )
- }
-
+ pc.chunkedMsgCtxMap.addIfAbsent(uuid, numChunks, totalChunksSize)
ctx := pc.chunkedMsgCtxMap.get(uuid)
-
- if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID != ctx.lastChunkedMsgID+1 {
- lastChunkedMsgID := -1
- totalChunks := -1
- if ctx != nil {
- lastChunkedMsgID = int(ctx.lastChunkedMsgID)
- totalChunks = int(ctx.totalChunks)
- ctx.chunkedMsgBuffer.Clear()
- }
- pc.log.Warnf(fmt.Sprintf(
- "Received unexpected chunk messageId %s, last-chunk-id %d, chunkId = %d, total-chunks %d",
- msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
+ if ctx == nil { // ideally this should never happen but just in case
+ pc.log.Warnf("Chunk context missing for uuid %s", uuid)
pc.chunkedMsgCtxMap.remove(uuid)
pc.availablePermits.inc()
return nil
}
+ // --- Out-of-order chunk handling ---
ctx.append(chunkID, msgID, compressedPayload)
- if msgMeta.GetChunkId() != msgMeta.GetNumChunksFromMsg()-1 {
+ // Check if all chunks are present
+ if !ctx.isComplete() {
pc.availablePermits.inc()
return nil
}
- return ctx.chunkedMsgBuffer
+ return ctx.reassemble(totalChunksSize)
}
func (pc *partitionConsumer) messageShouldBeDiscarded(msgID *trackingMessageID) bool {
@@ -2343,22 +2329,22 @@ func convertToMessageID(id *pb.MessageIdData) *trackingMessageID {
}
type chunkedMsgCtx struct {
- totalChunks int32
- chunkedMsgBuffer internal.Buffer
- lastChunkedMsgID int32
- chunkedMsgIDs []*messageID
- receivedTime int64
+ totalChunks int32
+ lastChunkedMsgID int32
+ chunkedMsgIDs []*messageID
+ chunkedMsgPayloads []internal.Buffer
+ receivedTime int64
mu sync.Mutex
}
-func newChunkedMsgCtx(numChunksFromMsg int32, totalChunkMsgSize int) *chunkedMsgCtx {
+func newChunkedMsgCtx(numChunksFromMsg int32, _ int) *chunkedMsgCtx {
return &chunkedMsgCtx{
- totalChunks: numChunksFromMsg,
- chunkedMsgBuffer: internal.NewBuffer(totalChunkMsgSize),
- lastChunkedMsgID: -1,
- chunkedMsgIDs: make([]*messageID, numChunksFromMsg),
- receivedTime: time.Now().Unix(),
+ totalChunks: numChunksFromMsg,
+ lastChunkedMsgID: -1,
+ chunkedMsgIDs: make([]*messageID, numChunksFromMsg),
+ chunkedMsgPayloads: make([]internal.Buffer, numChunksFromMsg),
+ receivedTime: time.Now().Unix(),
}
}
@@ -2366,7 +2352,7 @@ func (c *chunkedMsgCtx) append(chunkID int32, msgID *messageID, partPayload inte
c.mu.Lock()
defer c.mu.Unlock()
c.chunkedMsgIDs[chunkID] = msgID
- c.chunkedMsgBuffer.Write(partPayload.ReadableSlice())
+ c.chunkedMsgPayloads[chunkID] = partPayload
c.lastChunkedMsgID = chunkID
}
@@ -2402,6 +2388,35 @@ func (c *chunkedMsgCtx) discard(pc *partitionConsumer) {
}
}
+func (c *chunkedMsgCtx) isComplete() bool {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ complete := true
+ for i := int32(0); i < c.totalChunks; i++ {
+ if c.chunkedMsgIDs[i] == nil || c.chunkedMsgPayloads[i] == nil {
+ complete = false
+ break
+ }
+ }
+
+ return complete
+}
+
+func (c *chunkedMsgCtx) reassemble(totalChunkMsgSize int) internal.Buffer {
+ // Reassemble chunks in order
+ finalBuf := internal.NewBuffer(totalChunkMsgSize)
+
+ c.mu.Lock()
+ for i := range int32(len(c.chunkedMsgPayloads)) {
+ finalBuf.Write(c.chunkedMsgPayloads[i].ReadableSlice())
+ c.chunkedMsgPayloads[i].Release() // Release the buffer after use
+ }
+ c.mu.Unlock()
+
+ return finalBuf
+}
+
type chunkedMsgCtxMap struct {
chunkedMsgCtxs map[string]*chunkedMsgCtx
pendingQueue *list.List
@@ -2477,7 +2492,7 @@ func (c *chunkedMsgCtxMap) discardOldestChunkMessage(autoAck bool) {
ctx.discard(c.pc)
}
delete(c.chunkedMsgCtxs, oldest)
- c.pc.log.Infof("Chunked message [%s] has been removed from chunkedMsgCtxMap", oldest)
+ c.pc.log.Infof("Oldest Chunked message [%s] has been removed from chunkedMsgCtxMap due to max pending chunk messages exceeding", oldest)
}
func (c *chunkedMsgCtxMap) discardChunkMessage(uuid string, autoAck bool) {
@@ -2501,7 +2516,7 @@ func (c *chunkedMsgCtxMap) discardChunkMessage(uuid string, autoAck bool) {
break
}
}
- c.pc.log.Infof("Chunked message [%s] has been removed from chunkedMsgCtxMap", uuid)
+ c.pc.log.Infof("Chunked message [%s] has been removed from chunkedMsgCtxMap due to expireTimeOfIncompleteChunk happening", uuid)
}
func (c *chunkedMsgCtxMap) discardChunkIfExpire(uuid string, autoAck bool, expire time.Duration) {
What this does is when preparing for the chunk message it stores the chunk out of order until all the chunks are received and then reassambles it. This can be added behind a consumer property to support out of order processing. None of the other behavior changes with this logic.
Let me know how you would like to proceed forward
The order of chunks should be guaranteed by the producer. For example, consider a large message M composed of three chunks: M0, M1, and M2. The data in the topic might appear as follows if the producer resends the message:
M1, M2, M0, M1, N, M2
The producer must guarantee the order of M0, M1, and M2. An incorrect sequence like the one below should not happen:
M1, M2, M2, M0, M1
In this case, message M was not sent successfully, and the consumer should not receive a complete message M.
Returning to this issue, it seems to be easily reproduced. I think this may be related to the duplicate chunked message issue that PR https://github.com/apache/pulsar/pull/21101 fixed in the Java client. However, after reviewing the code, I found a problem with that PR. The 'Situation 1 - Message redeliver' case should never happen, and we do not need to detect a CorruptedChunkMessage.
This issue might also be considered an unexpected case of receiving incomplete chunks. We already have an option, AutoAckIncompleteChunk, to instruct the consumer on how to handle them.
I think a better solution is:
- If the consumer receives any duplicate chunk messages (e.g.,
M0, M1, M0', M1', M2'), the first two chunks (M0 and M1) should be acknowledged to avoid the acknowledgment hole. - If the consumer receives any unexpected incomplete messages (e.g.,
M1, M2, M0', M1', M2'), the first two chunks should be acknowledged or ignored based on theAutoAckIncompleteChunkoption.
What do you think? Does this work for your case?
Thanks for the response. I forgot to mention that allowOutOfOrderDelivery is set to true (i do not think it matters), AutoAckIncompleteChunk is set to false, KeySharedPolicy is AutoSplit.
I have couple of questions
- Is ordered guarantee for KEY_SHARED subscription?
- If you use async producer will the chunks be produced in order, even when retries are happening or network issue?
Coming to your solution now:
- If the consumer receives any duplicate chunk messages (e.g., M0, M1, M0', M1', M2'), the first two chunks (M0 and M1) should be acknowledged to avoid the acknowledgment hole.
#Why would consumer receive a duplicate chunk message in this situation? From the code flow, we never nack or ack any chunk messages until we hit maxPendingChunkingMessage and AutoAckIncompleteChunk or expireTimeOfIncompleteChunk. From the logs I saw none of these cases were hit and AutoAckIncompleteChunk is set to false in our environment. Let me know if I am missing anything here.
- If the consumer receives any unexpected incomplete messages (e.g., M1, M2, M0', M1', M2'), the first two chunks should be acknowledged or ignored based on the AutoAckIncompleteChunk option.
In this situation acknowledging first two chunks might not be the correct solution to do as we can potentially timeout if the other chunks never arrived, right? Also AutoAckIncompleteChunk actually moves forward the commit offset on the broker when the message would have not been consumed by the consumer application which can theoretically mean that we are dropping customer data, right? Due to this understanding we have disabled this flag.