How can the remaining messages in claim.Messages() be consumed when using manual commit in a consumer group?
As the code show in #1987,
func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
i := 0
for m := range claim.Messages() {
msg := m.Value
if err := h.f(context.Background(), msg); err != nil {
// noop
} else {
sess.MarkMessage(m, "")
i++
if i%dfv1.CommitN == 0 {
sess.Commit()
}
}
}
return nil
}
If the producer sends 8 messages and then the consumer pulls 8 messages, and if dfv1.CommitN=3, that means the remaining 2 messages will not be committed until another message is pulled
If I restart the consumer client, the remaining 2 messages will be consumed again, even though they have been marked but not committed.
Is there a good way to avoid consuming the 2 messages again?
When I use kafka-producer-perf-test.sh to simulate the producer sending 10,000 messages to a topic 't' with 2 partitions, and use the consumer code mentioned above with dfv1.CommitN set to 3, I observed that when consuming message number 9,748, the sess.Commit() operation becomes significantly slow, taking around 500-1,004 milliseconds. However, when consuming messages from number 9,747 onwards, the sess.Commit() operation takes only around 1-3 milliseconds.
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.