sarama icon indicating copy to clipboard operation
sarama copied to clipboard

Sarama batch consuming mode leads the 100% usage of CPU

Open xue610315921 opened this issue 4 years ago • 3 comments

github.com/Shopify/sarama v1.27.1
Versions
Sarama Kafka Go
v1.27.1 v1.0 1.15
Configuration
saramaConfig.Version = sarama.V1_0_0_0
saramaConfig.Consumer.Return.Errors = true

saramaConfig.Net.SASL.Enable = true
saramaConfig.Net.SASL.User = xxxx
saramaConfig.Net.SASL.Password = xxxx
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512

saramaClient, err := sarama.NewClient(brokers, saramaConfig)
saramaConsumer, err := sarama.NewConsumerGroupFromClient(group, saramaClient)
Logs
logs: CLICK ME

no error log
Problem Description

Is this the correct way to implement the batch consuming? The code here:

func (c *Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error) {
	rm = ratelimit.New(c.config.QpsLimitationPerPartition)
        msgArray := make([]*sarama.ConsumerMessage, 0, c.config.BatchNumber)
	for {
		select {
		case msg := <-claim.Messages():
			{
				if msg == nil {
					continue
				}

				rm.Take()
				msgArray = append(msgArray, msg)

				if len(msgArray) < c.config.BatchNumber {
					continue // not reach the batch number, continue to read from buffer
				}

				go c.executeBatchConsumeFunc(sess, msgArray) // pass last msg for batch commit
				msgArray = make([]*sarama.ConsumerMessage, 0, c.config.BatchNumber) // make new slice
			}
		default:
			{
				if len(msgArray) != 0 {
					// control goroutine num
					_, ok := <-c.channel
					if !ok { //channel maybe closed
						return
					}

					go c.executeBatchConsumeFunc(sess, msgArray) // pass last msg for batch commit
					msgArray = make([]*sarama.ConsumerMessage, 0, c.config.BatchNumber) // make new slice
				}
				// no new msg from channel, waiting for a period
				time.Sleep(time.Millisecond * time.Duration(c.config.BatchTimeInterval))
			}
		}
	}
}

Even I settled the rate limit to 50, the CPU usage of instance(docker) still would be 100%. Here is the pprof, I don't know why so many chanrecv here, and the lock & unlock in it cost a lot of resources.

271624210345_ pic_hd

In my memory when I was debugging, seems that I received a lot of nil from the channel(clainm.Message()), will it lead this? because I just continue when receiving nil. But I can't reproduce it on debugging machine now.

Thanks a lot!

xue610315921 avatar Jun 20 '21 17:06 xue610315921

sometimes kafka decides to rebalance a consumer group doc says that in this case message channel will be closed closed channel return nil

fxrlv avatar Aug 31 '21 15:08 fxrlv

@fxrlv so we need to restart the consumer manually or not? Thanks.

bingoabs avatar Apr 02 '22 06:04 bingoabs

if you get nil from the message channel, you should return from ConsumeClaim

then check returned error from Consume method you called before

nil error means you can recall Consume and get new claims

fxrlv avatar Apr 02 '22 10:04 fxrlv

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

github-actions[bot] avatar Aug 29 '23 20:08 github-actions[bot]

i.e., https://github.com/IBM/sarama/blob/dedd86d84f2d56a7c3b604ea36da479c3d2a80bb/examples/consumergroup/main.go#L180-L204

dnwe avatar Aug 29 '23 20:08 dnwe