rocketmq-client-go
rocketmq-client-go copied to clipboard
PushConsumer's default config easy to casue recreate `processQueue`
Under the following conditions, our consumer will always recreate processQueue
.
- default config
- pushConsumer with no orderly consume
- produce tps greater than consume tps
- topic with 64 queues
- consume 1 message took more than 1 second.
The related warning log is: delete mq from offset table
and fetch offset of mq from broker success
.
The reasons is:
- We pull message for every (topic, queue) with batch size controlled by
PullBatchSize
, default is 32. - Each batch of messages will send to
processQueue.msgCh
, which's buffer size controlled is32
. -
pushConsumer.consumeMessageCurrently
send n messages to consumer's handler, n is controlled byConsumeMessageBatchMaxSize
, default is 1. -
pushConsumer.crCh
limit a maximum ofConsumeGoroutineNums
goroutines to process messages,ConsumeGoroutineNums
default is 20. - Assume, consumer took d seconds to process
ConsumeMessageBatchMaxSize(1)
message. - _PullMaxIdleTime is 120 second
So when topic has 64 queue, the pull interval is
d * 32 / ConsumeMessageBatchMaxSize(1) * 64 / ConsumeGoroutineNums
. To avoid processQueue be recreated, d should less than 1 second, this number is two small.
Flow control only works in certain situations:
-
processQueue
pull messages more than 1000 over 32 times with batch size 32, 32*32=1024
Let we change default PullBatchSize
to 64?