kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

Best practice to commit offset with consumer group

Open madneal opened this issue 4 years ago • 7 comments

I have written code according to example_consumer_group_test.go. But there is a problem as the offset will not updated. So I cannot use kafka-consumer-groups.sh to obtain the CURRENT-OFFSET.

I think the reason is the code below:

	for {
		msg, err := reader.ReadMessage(ctx)
		switch err {
		case kafka.ErrGenerationEnded:
			// generation has ended.  commit offsets.  in a real app,
			// offsets would be committed periodically.
			gen.CommitOffsets(map[string]map[int]int64{"my-topic": {partition: offset}})
			return
		case nil:
			fmt.Printf("received message %s/%d/%d : %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
			offset = msg.Offset
		default:
			fmt.Printf("error reading message: %+v\n", err)
		}
	}

As the offset will not be commited untial a kafka.ErrGenerationEnded. I think it would be possible to commit the offset in case nil. But I don't think it's a good practice.

Is there any good way or documentation about this? Thanks. @stevevls

madneal avatar May 26 '20 04:05 madneal

And I found I cannot commit offset manunally. It will cause the exception of Invalid negative offset. Is there a correct way to commit offset?

madneal avatar May 26 '20 07:05 madneal

Hi @neal1991 . The example is just a toy program...since the kafka.ConsumerGroup API is still relatively new, we haven't wanted to bulk it up with too much functionality. But as mentioned, in a real program, you would have a loop in the background that periodically commits offsets.

If you'd like to look at how the kafka.Reader does it in order to draw inspiratation, you can start digging in here: https://github.com/segmentio/kafka-go/blob/f490bbc5ee1f4fe96e83b9f3efa1a971e8de500e/reader.go#L272-L274. This is where our commit loop gets associated with the consumer group.

Hope that helps!

stevevls avatar May 27 '20 02:05 stevevls

@stevevls Thanks for your reply. Now I implemented like this:

for {
	msg, err := reader.ReadMessage(ctx)
	switch err {
	case kafka.ErrGenerationEnded:
		// generation has ended.  commit offsets.  in a real app,
		// offsets would be committed periodically.
		gen.CommitOffsets(map[string]map[int]int64{CONFIG.Kafka.Topic: {partition: offset}})
		return
	case nil:
		gen.CommitOffsets(map[string]map[int]int64{CONFIG.Kafka.Topic: {partition: offset}})
		offset = msg.Offset
	default:
		Log.Errorf("error reading message: %+v\n", err)
	}
}

Is there any difference between the two ways? Or which one is better.

madneal avatar May 27 '20 02:05 madneal

No problem! This would definitely work. Depending on your message volume, though, it may not be the most efficient. Every time you call CommitOffsets, it will incur a round trip to Kafka. The strategy of committing after each message is a good choice if you have a low volume consumer and/or it's important not to repeat work if your program crashes and you need to resume from the last committed offset. If you're dealing with high volumes, then you want to push the commits to the background. Of course, the tradeoff is more repeated work if your program crashes and you need to resume from the last commit. Since kafka.ConsumerGroup is a lower-level API, it leaves decisions like this up to the client so that they can make the best tradeoff for their workload. 😄

stevevls avatar May 27 '20 03:05 stevevls

@stevevls My kafka cosumer is large. So I am worrying about the cost. I have tried to periodically gen.CommitOffsets. But it would cause the error of negative offset, just as I have mentionsed above.

So does it mean for high volumes,

 gen.Start(func(ctx context.Context) { 
 	r.commitLoop(ctx, gen) 
 }) 

This will be a better choice?

madneal avatar May 27 '20 03:05 madneal

@stevevls could you take a look and see if we can follow up on the last question to close the issue?

achille-roussel avatar Oct 29 '21 16:10 achille-roussel

I think we can close this due to inactivity. The best approach as noted above for a high volume group is to run a commit loop in the background. We could potentially add a commit loop function to the ConsumerGroup type since it seems like a common case.

The only thing I'm not sure about here is the negative offset error, but we haven't witnessed that elsewhere, so we would need to work with @madneal if we wanted to debug further.

stevevls avatar Oct 29 '21 17:10 stevevls