rocketmq-client-go icon indicating copy to clipboard operation
rocketmq-client-go copied to clipboard

PushConsumer's default config easy to casue recreate `processQueue`

Open j2gg0s opened this issue 2 years ago • 0 comments

Under the following conditions, our consumer will always recreate processQueue.

  • default config
  • pushConsumer with no orderly consume
  • produce tps greater than consume tps
  • topic with 64 queues
  • consume 1 message took more than 1 second.

The related warning log is: delete mq from offset table and fetch offset of mq from broker success.

The reasons is:

  • We pull message for every (topic, queue) with batch size controlled by PullBatchSize, default is 32.
  • Each batch of messages will send to processQueue.msgCh, which's buffer size controlled is 32.
  • pushConsumer.consumeMessageCurrently send n messages to consumer's handler, n is controlled by ConsumeMessageBatchMaxSize, default is 1.
  • pushConsumer.crCh limit a maximum of ConsumeGoroutineNums goroutines to process messages, ConsumeGoroutineNums default is 20.
  • Assume, consumer took d seconds to process ConsumeMessageBatchMaxSize(1) message.
  • _PullMaxIdleTime is 120 second So when topic has 64 queue, the pull interval is d * 32 / ConsumeMessageBatchMaxSize(1) * 64 / ConsumeGoroutineNums. To avoid processQueue be recreated, d should less than 1 second, this number is two small.

Flow control only works in certain situations:

  • processQueue pull messages more than 1000 over 32 times with batch size 32, 32*32=1024

Let we change default PullBatchSize to 64?

j2gg0s avatar Oct 12 '22 15:10 j2gg0s