pulsar-client-go icon indicating copy to clipboard operation
pulsar-client-go copied to clipboard

Unexpected chunk messages are not redelivered

Open jgheewala opened this issue 5 months ago • 3 comments

Expected behavior

When chunking is enabled, a golang consumer client code sometimes remove chunked msg context from it data through the below logic:

	if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID != ctx.lastChunkedMsgID+1 {
		lastChunkedMsgID := -1
		totalChunks := -1
		if ctx != nil {
			lastChunkedMsgID = int(ctx.lastChunkedMsgID)
			totalChunks = int(ctx.totalChunks)
			ctx.chunkedMsgBuffer.Clear()
		}
		pc.log.Warnf(fmt.Sprintf(
			"Received unexpected chunk messageId %s, last-chunk-id %d, chunkId = %d, total-chunks %d",
			msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
		pc.chunkedMsgCtxMap.remove(uuid)
		pc.availablePermits.inc()

		return nil
	}

If pulsar-client-go is removing such chunked message ctx's then there should be some information back to the broker for re-attempting this message again.

Actual behavior

When a consumer client discards this message, broker never retries to send the same message again and the msgbacklog ouput (through cli)

./pulsar-admin topics partitioned-stats persistent://dummy/my_topic/my_namespace | grep msgBacklog being non-zero. Sometimes 5, 10 or even 1 message

Steps to reproduce

Enable producer and consumer side chunking with the following configs

Backend Consumer config

    consumer:
      topic: "my_topic"
      namespace: "my_namespace"
      subscriptionName: "my_subscription"
      subscriptionType: "key_shared"
      receiverQueueSize: 1000
      enableAutoScaledReceiverQueueSizeLimit: "true"
      nackRedeliveryDelay: 30s
      maxPendingChunkMessages: 100
      expiryTimeOfIncompleteChunkSec: 120
      autoAckIncompleteChunkEnabled: "false"

Backend Producer config

    producer:
     topic: "my_topic"
      pulsarClientIsolationEnabled: false
      enabledChunking: "true"
      compressionType: "lz4"
      compressionLevel: "faster"
      sendTimeout: 60s
      disableBlockIfQueFull: true
      maxPendingMessages: 1000
      hashingScheme: "murmur3_32_hash"
      sendAsync: true

Broker configuration

  broker:
      image: apachepulsar/pulsar:latest
      container_name: broker
      hostname: broker
      restart: no
      environment:
        - metadataStoreUrl=zk:zookeeper:2181
        - zookeeperServers=zookeeper:2181
        - clusterName=cluster-a
        - managedLedgerDefaultEnsembleSize=1
        - managedLedgerDefaultWriteQuorum=1
        - managedLedgerDefaultAckQuorum=1
        - advertisedAddress=broker
        - advertisedListeners=external:pulsar://broker:6650
        - PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=2048m
        - maxMessageSize=1242880
      depends_on:
        zookeeper:
          condition: service_healthy
        bookie:
          condition: service_started
      ports:
        - "6650:6650"
        - "8080:8080"
      command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
      healthcheck:
        test: [ "CMD", "curl", "-f", "http://localhost:8080/admin/v2/clusters" ]
        interval: 5s
        timeout: 10s
        retries: 5

Ingestion payload size is 15MB.

Send this request for 300 seconds and the unexpected chunk message will come within 10-15 seconds on local machine.

Note: Currently, we are not hitting go c.discardChunkIfExpire(uuid, true, c.pc.options.expireTimeOfIncompleteChunk) and go c.discardOldestChunkMessage(c.pc.options.autoAckIncompleteChunk) condition for discard. The potential problem lies over there also as no such mechanism for broker to retry is made.

System configuration

Pulsar version: 3.3.2 for broker and 0.15.1 for pulsar-client-go

Can, we get some help here, as majority of our pipelines are currently impacted with this flow.

jgheewala avatar Jul 24 '25 21:07 jgheewala

Hi,

I think the problem is that with chunking we should be allowing out of order deliveries of the chunk messages. Current, implementation is very strict on the ordering which cannot be guaranteed in the distributed systems.

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 61c82bf..f8de2ac 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -179,7 +179,7 @@ type partitionConsumer struct {
 	dlq         *dlqRouter
 
 	log                  log.Logger
-	compressionProviders sync.Map //map[pb.CompressionType]compression.Provider
+	compressionProviders sync.Map // map[pb.CompressionType]compression.Provider
 	metrics              *internal.LeveledMetrics
 	decryptor            cryptointernal.Decryptor
 	schemaInfoCache      *schemaInfoCache
@@ -1449,39 +1449,25 @@ func (pc *partitionConsumer) processMessageChunk(compressedPayload internal.Buff
 		partitionIdx: pc.partitionIdx,
 	}
 
-	if msgMeta.GetChunkId() == 0 {
-		pc.chunkedMsgCtxMap.addIfAbsent(uuid,
-			numChunks,
-			totalChunksSize,
-		)
-	}
-
+	pc.chunkedMsgCtxMap.addIfAbsent(uuid, numChunks, totalChunksSize)
 	ctx := pc.chunkedMsgCtxMap.get(uuid)
-
-	if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID != ctx.lastChunkedMsgID+1 {
-		lastChunkedMsgID := -1
-		totalChunks := -1
-		if ctx != nil {
-			lastChunkedMsgID = int(ctx.lastChunkedMsgID)
-			totalChunks = int(ctx.totalChunks)
-			ctx.chunkedMsgBuffer.Clear()
-		}
-		pc.log.Warnf(fmt.Sprintf(
-			"Received unexpected chunk messageId %s, last-chunk-id %d, chunkId = %d, total-chunks %d",
-			msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
+	if ctx == nil { // ideally this should never happen but just in case
+		pc.log.Warnf("Chunk context missing for uuid %s", uuid)
 		pc.chunkedMsgCtxMap.remove(uuid)
 		pc.availablePermits.inc()
 		return nil
 	}
 
+	// --- Out-of-order chunk handling ---
 	ctx.append(chunkID, msgID, compressedPayload)
 
-	if msgMeta.GetChunkId() != msgMeta.GetNumChunksFromMsg()-1 {
+	// Check if all chunks are present
+	if !ctx.isComplete() {
 		pc.availablePermits.inc()
 		return nil
 	}
 
-	return ctx.chunkedMsgBuffer
+	return ctx.reassemble(totalChunksSize)
 }
 
 func (pc *partitionConsumer) messageShouldBeDiscarded(msgID *trackingMessageID) bool {
@@ -2343,22 +2329,22 @@ func convertToMessageID(id *pb.MessageIdData) *trackingMessageID {
 }
 
 type chunkedMsgCtx struct {
-	totalChunks      int32
-	chunkedMsgBuffer internal.Buffer
-	lastChunkedMsgID int32
-	chunkedMsgIDs    []*messageID
-	receivedTime     int64
+	totalChunks        int32
+	lastChunkedMsgID   int32
+	chunkedMsgIDs      []*messageID
+	chunkedMsgPayloads []internal.Buffer
+	receivedTime       int64
 
 	mu sync.Mutex
 }
 
-func newChunkedMsgCtx(numChunksFromMsg int32, totalChunkMsgSize int) *chunkedMsgCtx {
+func newChunkedMsgCtx(numChunksFromMsg int32, _ int) *chunkedMsgCtx {
 	return &chunkedMsgCtx{
-		totalChunks:      numChunksFromMsg,
-		chunkedMsgBuffer: internal.NewBuffer(totalChunkMsgSize),
-		lastChunkedMsgID: -1,
-		chunkedMsgIDs:    make([]*messageID, numChunksFromMsg),
-		receivedTime:     time.Now().Unix(),
+		totalChunks:        numChunksFromMsg,
+		lastChunkedMsgID:   -1,
+		chunkedMsgIDs:      make([]*messageID, numChunksFromMsg),
+		chunkedMsgPayloads: make([]internal.Buffer, numChunksFromMsg),
+		receivedTime:       time.Now().Unix(),
 	}
 }
 
@@ -2366,7 +2352,7 @@ func (c *chunkedMsgCtx) append(chunkID int32, msgID *messageID, partPayload inte
 	c.mu.Lock()
 	defer c.mu.Unlock()
 	c.chunkedMsgIDs[chunkID] = msgID
-	c.chunkedMsgBuffer.Write(partPayload.ReadableSlice())
+	c.chunkedMsgPayloads[chunkID] = partPayload
 	c.lastChunkedMsgID = chunkID
 }
 
@@ -2402,6 +2388,35 @@ func (c *chunkedMsgCtx) discard(pc *partitionConsumer) {
 	}
 }
 
+func (c *chunkedMsgCtx) isComplete() bool {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	complete := true
+	for i := int32(0); i < c.totalChunks; i++ {
+		if c.chunkedMsgIDs[i] == nil || c.chunkedMsgPayloads[i] == nil {
+			complete = false
+			break
+		}
+	}
+
+	return complete
+}
+
+func (c *chunkedMsgCtx) reassemble(totalChunkMsgSize int) internal.Buffer {
+	// Reassemble chunks in order
+	finalBuf := internal.NewBuffer(totalChunkMsgSize)
+
+	c.mu.Lock()
+	for i := range int32(len(c.chunkedMsgPayloads)) {
+		finalBuf.Write(c.chunkedMsgPayloads[i].ReadableSlice())
+		c.chunkedMsgPayloads[i].Release() // Release the buffer after use
+	}
+	c.mu.Unlock()
+
+	return finalBuf
+}
+
 type chunkedMsgCtxMap struct {
 	chunkedMsgCtxs map[string]*chunkedMsgCtx
 	pendingQueue   *list.List
@@ -2477,7 +2492,7 @@ func (c *chunkedMsgCtxMap) discardOldestChunkMessage(autoAck bool) {
 		ctx.discard(c.pc)
 	}
 	delete(c.chunkedMsgCtxs, oldest)
-	c.pc.log.Infof("Chunked message [%s] has been removed from chunkedMsgCtxMap", oldest)
+	c.pc.log.Infof("Oldest Chunked message [%s] has been removed from chunkedMsgCtxMap due to max pending chunk messages exceeding", oldest)
 }
 
 func (c *chunkedMsgCtxMap) discardChunkMessage(uuid string, autoAck bool) {
@@ -2501,7 +2516,7 @@ func (c *chunkedMsgCtxMap) discardChunkMessage(uuid string, autoAck bool) {
 			break
 		}
 	}
-	c.pc.log.Infof("Chunked message [%s] has been removed from chunkedMsgCtxMap", uuid)
+	c.pc.log.Infof("Chunked message [%s] has been removed from chunkedMsgCtxMap due to expireTimeOfIncompleteChunk happening", uuid)
 }
 
 func (c *chunkedMsgCtxMap) discardChunkIfExpire(uuid string, autoAck bool, expire time.Duration) {

What this does is when preparing for the chunk message it stores the chunk out of order until all the chunks are received and then reassambles it. This can be added behind a consumer property to support out of order processing. None of the other behavior changes with this logic.

Let me know how you would like to proceed forward

jgheewala avatar Jul 28 '25 16:07 jgheewala

The order of chunks should be guaranteed by the producer. For example, consider a large message M composed of three chunks: M0, M1, and M2. The data in the topic might appear as follows if the producer resends the message:

M1, M2, M0, M1, N, M2

The producer must guarantee the order of M0, M1, and M2. An incorrect sequence like the one below should not happen:

M1, M2, M2, M0, M1

In this case, message M was not sent successfully, and the consumer should not receive a complete message M.

Returning to this issue, it seems to be easily reproduced. I think this may be related to the duplicate chunked message issue that PR https://github.com/apache/pulsar/pull/21101 fixed in the Java client. However, after reviewing the code, I found a problem with that PR. The 'Situation 1 - Message redeliver' case should never happen, and we do not need to detect a CorruptedChunkMessage.

This issue might also be considered an unexpected case of receiving incomplete chunks. We already have an option, AutoAckIncompleteChunk, to instruct the consumer on how to handle them.

I think a better solution is:

  • If the consumer receives any duplicate chunk messages (e.g., M0, M1, M0', M1', M2'), the first two chunks (M0 and M1) should be acknowledged to avoid the acknowledgment hole.
  • If the consumer receives any unexpected incomplete messages (e.g., M1, M2, M0', M1', M2'), the first two chunks should be acknowledged or ignored based on the AutoAckIncompleteChunk option.

What do you think? Does this work for your case?

RobertIndie avatar Jul 29 '25 14:07 RobertIndie

Thanks for the response. I forgot to mention that allowOutOfOrderDelivery is set to true (i do not think it matters), AutoAckIncompleteChunk is set to false, KeySharedPolicy is AutoSplit.

I have couple of questions

  1. Is ordered guarantee for KEY_SHARED subscription?
  2. If you use async producer will the chunks be produced in order, even when retries are happening or network issue?

Coming to your solution now:

  • If the consumer receives any duplicate chunk messages (e.g., M0, M1, M0', M1', M2'), the first two chunks (M0 and M1) should be acknowledged to avoid the acknowledgment hole.

#Why would consumer receive a duplicate chunk message in this situation? From the code flow, we never nack or ack any chunk messages until we hit maxPendingChunkingMessage and AutoAckIncompleteChunk or expireTimeOfIncompleteChunk. From the logs I saw none of these cases were hit and AutoAckIncompleteChunk is set to false in our environment. Let me know if I am missing anything here.

  • If the consumer receives any unexpected incomplete messages (e.g., M1, M2, M0', M1', M2'), the first two chunks should be acknowledged or ignored based on the AutoAckIncompleteChunk option.

In this situation acknowledging first two chunks might not be the correct solution to do as we can potentially timeout if the other chunks never arrived, right? Also AutoAckIncompleteChunk actually moves forward the commit offset on the broker when the message would have not been consumed by the consumer application which can theoretically mean that we are dropping customer data, right? Due to this understanding we have disabled this flag.

jgheewala avatar Jul 29 '25 16:07 jgheewala