Sarama batch consuming mode leads the 100% usage of CPU
github.com/Shopify/sarama v1.27.1
Versions
| Sarama | Kafka | Go |
|---|---|---|
| v1.27.1 | v1.0 | 1.15 |
Configuration
saramaConfig.Version = sarama.V1_0_0_0
saramaConfig.Consumer.Return.Errors = true
saramaConfig.Net.SASL.Enable = true
saramaConfig.Net.SASL.User = xxxx
saramaConfig.Net.SASL.Password = xxxx
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
saramaClient, err := sarama.NewClient(brokers, saramaConfig)
saramaConsumer, err := sarama.NewConsumerGroupFromClient(group, saramaClient)
Logs
logs: CLICK ME
no error log
Problem Description
Is this the correct way to implement the batch consuming? The code here:
func (c *Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error) {
rm = ratelimit.New(c.config.QpsLimitationPerPartition)
msgArray := make([]*sarama.ConsumerMessage, 0, c.config.BatchNumber)
for {
select {
case msg := <-claim.Messages():
{
if msg == nil {
continue
}
rm.Take()
msgArray = append(msgArray, msg)
if len(msgArray) < c.config.BatchNumber {
continue // not reach the batch number, continue to read from buffer
}
go c.executeBatchConsumeFunc(sess, msgArray) // pass last msg for batch commit
msgArray = make([]*sarama.ConsumerMessage, 0, c.config.BatchNumber) // make new slice
}
default:
{
if len(msgArray) != 0 {
// control goroutine num
_, ok := <-c.channel
if !ok { //channel maybe closed
return
}
go c.executeBatchConsumeFunc(sess, msgArray) // pass last msg for batch commit
msgArray = make([]*sarama.ConsumerMessage, 0, c.config.BatchNumber) // make new slice
}
// no new msg from channel, waiting for a period
time.Sleep(time.Millisecond * time.Duration(c.config.BatchTimeInterval))
}
}
}
}
Even I settled the rate limit to 50, the CPU usage of instance(docker) still would be 100%. Here is the pprof, I don't know why so many chanrecv here, and the lock & unlock in it cost a lot of resources.

In my memory when I was debugging, seems that I received a lot of nil from the channel(clainm.Message()), will it lead this? because I just continue when receiving nil. But I can't reproduce it on debugging machine now.
Thanks a lot!
sometimes kafka decides to rebalance a consumer group doc says that in this case message channel will be closed closed channel return nil
@fxrlv so we need to restart the consumer manually or not? Thanks.
if you get nil from the message channel, you should return from ConsumeClaim
then check returned error from Consume method you called before
nil error means you can recall Consume and get new claims
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.
i.e., https://github.com/IBM/sarama/blob/dedd86d84f2d56a7c3b604ea36da479c3d2a80bb/examples/consumergroup/main.go#L180-L204