kafka-go
kafka-go copied to clipboard
Best practice to commit offset with consumer group
I have written code according to example_consumer_group_test.go. But there is a problem as the offset will not updated. So I cannot use kafka-consumer-groups.sh to obtain the CURRENT-OFFSET.
I think the reason is the code below:
for {
msg, err := reader.ReadMessage(ctx)
switch err {
case kafka.ErrGenerationEnded:
// generation has ended. commit offsets. in a real app,
// offsets would be committed periodically.
gen.CommitOffsets(map[string]map[int]int64{"my-topic": {partition: offset}})
return
case nil:
fmt.Printf("received message %s/%d/%d : %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
offset = msg.Offset
default:
fmt.Printf("error reading message: %+v\n", err)
}
}
As the offset will not be commited untial a kafka.ErrGenerationEnded. I think it would be possible to commit the offset in case nil
. But I don't think it's a good practice.
Is there any good way or documentation about this? Thanks. @stevevls
And I found I cannot commit offset manunally. It will cause the exception of Invalid negative offset. Is there a correct way to commit offset?
Hi @neal1991 . The example is just a toy program...since the kafka.ConsumerGroup
API is still relatively new, we haven't wanted to bulk it up with too much functionality. But as mentioned, in a real program, you would have a loop in the background that periodically commits offsets.
If you'd like to look at how the kafka.Reader
does it in order to draw inspiratation, you can start digging in here: https://github.com/segmentio/kafka-go/blob/f490bbc5ee1f4fe96e83b9f3efa1a971e8de500e/reader.go#L272-L274. This is where our commit loop gets associated with the consumer group.
Hope that helps!
@stevevls Thanks for your reply. Now I implemented like this:
for {
msg, err := reader.ReadMessage(ctx)
switch err {
case kafka.ErrGenerationEnded:
// generation has ended. commit offsets. in a real app,
// offsets would be committed periodically.
gen.CommitOffsets(map[string]map[int]int64{CONFIG.Kafka.Topic: {partition: offset}})
return
case nil:
gen.CommitOffsets(map[string]map[int]int64{CONFIG.Kafka.Topic: {partition: offset}})
offset = msg.Offset
default:
Log.Errorf("error reading message: %+v\n", err)
}
}
Is there any difference between the two ways? Or which one is better.
No problem! This would definitely work. Depending on your message volume, though, it may not be the most efficient. Every time you call CommitOffsets
, it will incur a round trip to Kafka. The strategy of committing after each message is a good choice if you have a low volume consumer and/or it's important not to repeat work if your program crashes and you need to resume from the last committed offset. If you're dealing with high volumes, then you want to push the commits to the background. Of course, the tradeoff is more repeated work if your program crashes and you need to resume from the last commit. Since kafka.ConsumerGroup
is a lower-level API, it leaves decisions like this up to the client so that they can make the best tradeoff for their workload. 😄
@stevevls My kafka cosumer is large. So I am worrying about the cost. I have tried to periodically gen.CommitOffsets
. But it would cause the error of negative offset, just as I have mentionsed above.
So does it mean for high volumes,
gen.Start(func(ctx context.Context) {
r.commitLoop(ctx, gen)
})
This will be a better choice?
@stevevls could you take a look and see if we can follow up on the last question to close the issue?
I think we can close this due to inactivity. The best approach as noted above for a high volume group is to run a commit loop in the background. We could potentially add a commit loop function to the ConsumerGroup
type since it seems like a common case.
The only thing I'm not sure about here is the negative offset error, but we haven't witnessed that elsewhere, so we would need to work with @madneal if we wanted to debug further.