kafka-go
kafka-go copied to clipboard
Duplicate consumption occurs when join a new consumer and CommitInterval is configured
In the following code, when a new consumer join the group, duplicate data consumption will be detected.
Set CommitInterval to zero can reduce the number of duplicate messages. I suspect the commit did not take effect, call CommitMessages did not returns error.
A similar consumer using sarama not found such issue.
Perhaps during rebalance, it is necessary to do sth for commits that have not yet been submitted. Because when CommitInterval is configured, calling CommitMessages will not take effect immediately.
func ReadKafkaWithKafkago(ctx context.Context, task string) {
topic := "testkafka"
address := "127.0.0.1:9092"
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{address},
Topic: topic,
CommitInterval: time.Second,
GroupID: "test1",
QueueCapacity: 50,
})
loop:
for {
// When receiving the sigterm signal the ctx sets to done
msg, err := reader.FetchMessage(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
logrus.Infof("kafka get eof")
break loop
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
logrus.Infof("kafka get context canceled:%v", err)
break loop
}
logrus.Errorf("kafka get msg error:%v", err)
}
checkDuplicates(msg) // each msg has a unique id so I can use it to check msgs
err = reader.CommitMessages(context.Background(), msg)
if err != nil {
logrus.Errorf("commit error:%v", err)
}
}
reader.Close()
}
Kafka Version
- What version(s) of Kafka are you testing against? kafka_2.13-2.7.1
- What version of kafka-go are you using? v0.4.47
To Reproduce
- Use the above code to start a new consumer.
- Duplicate messages appear for some existing consumers or the new consumer.
We're hitting this as well. We're trying to use a 5s commit interval (synchronous won't work for us for performance reasons) and regardless of other settings, we consistently have 2x the number of messages when using kafka-go compared to a variety of other consumers of the same data. I can provide other info if desired but, mostly just confirming this behavior.
I confirm the issue still happens, even without setting CommitInterval , upon a new consumer connects duplicates appear.
Same here, with only this configs set Dialer, Brokers, Topic, MinBytes, MaxBytes, GroupID. My guess is that the library has some kind of issue setting referencing the consumer group.
I used other library before I never had this problem.