sarama
sarama copied to clipboard
Consumer group not reconsuming messages if message is not marked
Description
Using a go test, I'm producing a message for a kafka consumer group I created. In this consumer group, I'm logging the kafka message, but I'm not doing session.MarkMessage
or session.Commit
on the message. I would expect that this would allow me to reconsume the message, and it would relog the message in the for loop, but it does not reconsume the message.
Example code for handler
func (cgh *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
//session.MarkMessage(message, "")
//session.Commit()
}
return nil
}
Versions
Sarama | Kafka | Go |
---|---|---|
v1.43.0 | 3.6 | 1.21 |
Configuration
saramaConfig.Consumer.Return.Errors = true
saramaConfig.Consumer.Offsets.AutoCommit.Enable = false
Logs
logs: CLICK ME
Additional Context
I hit this exact same issue and figured it out...
It's because a new consumer group defaults to an initial offset at the end of the topic. This is set with samaraConfig.Consumer.Offsets.Initial
. If you set samConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
you'll find it will replay messages from the start of the topic.
In my case I has assumed that having samConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
(-1, the end of the topic) and then starting a consumer for a groupID and committing offsets (either manually or automatically) without marking any messages processed, would actually commit concrete offset values - basically a snapshot of the newest offsets at the time the consumer started.
It doesn't appear that is the case, so when you start up again with the same groupID offsets are still -1. Once a message has been marked for each partition and offsets committed, this will produce concrete offsets and everything works as expected from there.
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.