sarama icon indicating copy to clipboard operation
sarama copied to clipboard

calling MarkOffset function but kafka not update the offset

Open ybos opened this issue 4 years ago • 2 comments

Versions
Sarama Kafka Go
v1.26.1 v2.3.1 v1.14
Configuration
clusterConfig := sarama.NewConfig()
clusterConfig.Consumer.Return.Errors = true
clusterConfig.Version = sarama.V2_3_0_0

AutoCommit.Enable = true is default code in Sarama.

c.Consumer.Offsets.AutoCommit.Enable = true
c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest
c.Consumer.Offsets.Retry.Max = 3
Problem Description

1st: My program was executed at 2020-10-28 08:02:13, and crashed at 2020-12-11 11:51:59. The offset of Kafka is 238663 to 264936, but when I executed the program again, the next offset was 250011. I searched the runtime log and found the record (offset: 250011) was executed at 2020-11-02 21:37:38. I call the MarkOffset function each time, but Sarama did not update the latest offset to Kafka.

2nd: I executed it again at 2020-12-15 05:22:26, the offset of Kafka is 250011. Then I stopped the program at 2020-12-15 05:23:13, the offset of Kafka is 264936. After that I executed it again, and the next offset is 264489.

I think the second situation is calling MarkOffset does not necessarily commit the offset to the backend store immediately. But the first situation I have no idea why it will happen.. for nearly a month, it did not update the offset to Kafka..

Some Code
partitionOffsetManager, err := offsetManager.ManagePartition(FileConfig.KafkaTopic, 0)

// get the next offset in Kafka
nextOffset, _ := partitionOffsetManager.NextOffset()

// This is the problem, if the value is wrong, then I will repeat consume records
partitionConsumer, err := consumer.ConsumePartition(FileConfig.KafkaTopic, 0, nextOffset)

// save partitionOffsetManager and use it later.
obj := KafkaObject {
	OffsetManager: 		&offsetManager,
	PartitionOffsetManager: &partitionOffsetManager,
	PartitionConsumer: 	&partitionConsumer,
}

// commit offset
func (k *KafkaObject) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
	(*k.PartitionOffsetManager).MarkOffset(msg.Offset, metadata)
}

ybos avatar Dec 15 '20 12:12 ybos

I don’t know if the garbage collection program reclaimed the PartitionOffsetManager? And I use Amazon MSK. I don't know if long time no use the connect will be closed?

I checked the logs, the dates of consume records below: October: 2020-10-28, 2020-10-29, 2020-10-30, 2020-10-31

November: 2020-11-01, 2020-11-02, 2020-11-03, 2020-11-04, 2020-11-05, 2020-11-06, 2020-11-07, 2020-11-08, 2020-11-09, 2020-11-10, 2020-11-11, 2020-11-12, 2020-11-13

2020-11-16, 2020-11-17

2020-11-19, 2020-11-20

2020-11-23, 2020-11-24, 2020-11-25, 2020-11-26, 2020-11-27

2020-11-30

December: 2020-12-01, 2020-12-02, 2020-12-03, 2020-12-04

2020-12-07, 2020-12-08

2020-12-10, 2020-12-11

ybos avatar Dec 15 '20 12:12 ybos

MarkOffset只是记录,还没有落盘,需要你手动的commit,session.commit调用offsetmannager-commit;consumer 就算是手动commit,宕机情况exctlyOnce ,-1 writerall 还是会双花,总结还得调用事务 才能保证exctlyOnce。

DKLuck avatar Jan 05 '23 13:01 DKLuck

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

github-actions[bot] avatar Aug 25 '23 10:08 github-actions[bot]

Hmm. I’m not aware of any previous issues with Sarama not committing offsets. I can only assume that there was some issue with your MSK cluster in its infancy that was causing loss of committed offsets. Happy to investigate further if you can still reproduce this on recent sarama and your current MSK

dnwe avatar Aug 25 '23 19:08 dnwe