v3.1.0 - tail not properly calculating offsets
I am encountering an bug with v3.1.0. v3.0.3 is able to successfully consume in this scenario.
This is related to the rewrite of internal/consume/PartitionConsumer.go ( getOffsetBounds ) method.
Seems to me like the check comparing oldest offset to startOffset was removed in the move from 3.0.3 to 3.1.0
I applied the following quick fix locally which enables --tail option to succeed in this case. [ L155 ]
if flags.Tail > 0 && startOffset == sarama.OffsetNewest {
//When --tail is used compute startOffset so that it minimizes the number of messages consumed
minOffset := endOffset - int64(flags.Tail)
oldestOffset, err := (*client).GetOffset(topic, currentPartition, sarama.OffsetOldest)
if err != nil {
return -1, -1, errors.Errorf("failed to get offset for topic %s Partition %d: %v", topic, currentPartition, err)
}
if minOffset < oldestOffset {
startOffset = oldestOffset
} else if minOffset > 0 {
startOffset = minOffset
} else {
startOffset = sarama.OffsetOldest
}
}
I have a topic with 30 day DELETE retention policy which has recently had all messages deleted as per the retention policy leaving the topic described below:
VERSION 3.1.0 LOGS
CONFIG VALUE
compression.type gzip
min.insync.replicas 2
cleanup.policy delete
retention.ms 2592000000
message.timestamp.type LogAppendTime
PARTITION OLDEST_OFFSET NEWEST_OFFSET EMPTY LEADER REPLICAS IN_SYNC_REPLICAS
0 45408711 45408718 false broker1:12345 3,4,5 3,4,5
1 5426176 5426176 true broker1:12345 1,2,4 1,2,4
2 5058115 5058126 false broker1:12345 1,2,5 1,2,5
3 5100615 5100615 true broker1:12345 2,3,5 2,3,5
VERSION 3.1.0 LOGS
~/$: kafkactl consume ingestion-topic --tail 1 -V
[kafkactl] 2023/07/03 10:59:59 Using config file: /Users/user/.config/kafkactl/config.yml
[kafkactl] 2023/07/03 10:59:59 Assuming kafkaVersion: 2.5.0
[kafkactl] 2023/07/03 10:59:59 Assuming kafkaVersion: 2.5.0
[kafkactl] 2023/07/03 10:59:59 using default admin request timeout: 3s
[kafkactl] 2023/07/03 10:59:59 TLS is enabled.
[sarama ] 2023/07/03 10:59:59 Initializing new client
[sarama ] 2023/07/03 10:59:59 client/metadata fetching metadata for all topics from broker broker1
[sarama ] 2023/07/03 10:59:59 Connected to broker at broker1 (unregistered)
[sarama ] 2023/07/03 10:59:59 client/brokers registered new broker #5 at broker1
[sarama ] 2023/07/03 10:59:59 client/brokers registered new broker #4 at broker1
[sarama ] 2023/07/03 10:59:59 client/brokers registered new broker #1 at broker1
[sarama ] 2023/07/03 10:59:59 client/brokers registered new broker #2 at broker1
[sarama ] 2023/07/03 10:59:59 client/brokers registered new broker #3 at broker1
[sarama ] 2023/07/03 10:59:59 Successfully initialized new client
[kafkactl] 2023/07/03 10:59:59 Start consuming topic: ingestion-topic
[sarama ] 2023/07/03 10:59:59 Connected to broker at broker1 (registered as #4)
[sarama ] 2023/07/03 10:59:59 Connected to broker at broker1 (registered as #2)
[sarama ] 2023/07/03 10:59:59 Connected to broker at broker1 (registered as #1)
[sarama ] 2023/07/03 10:59:59 Connected to broker at broker1 (registered as #5)
[kafkactl] 2023/07/03 11:00:00 consumer will consume offset 5058124 to 5058125 on partition 2
[kafkactl] 2023/07/03 11:00:00 consumer will consume offset 5426174 to 5426175 on partition 1
[kafkactl] 2023/07/03 11:00:00 consumer will consume offset 5100613 to 5100614 on partition 3
[kafkactl] 2023/07/03 11:00:00 consumer will consume offset 45408716 to 45408717 on partition 0
[kafkactl] 2023/07/03 11:00:00 Start consuming partition 2 from offset 5058124 to 5058125
[kafkactl] 2023/07/03 11:00:00 Start consuming partition 0 from offset 45408716 to 45408717
Failed to start consumer: Failed to start consumer for partition 3: kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition
You can see in the above logs, that the offsets being calculated are 'newestOffset - 1' but that offset doesn't exist because it has been deleted now.
Relevant part of 3.0.3 logs running the same command.
[kafkactl] 2023/07/03 11:24:51 Skipping partition 1
[kafkactl] 2023/07/03 11:24:51 Skipping partition 3
[kafkactl] 2023/07/03 11:24:51 Start consuming partition 2 from offset 5058125 to 5058125
[kafkactl] 2023/07/03 11:24:51 Start consuming partition 0 from offset 45408717 to 45408717
You can see the offsets are calculated much differently in the working 3.0.3 version.
I am also seeing a similar but different issue when attempting to consume using --offset.
:~/$ kafkactl consume ingestion-topic --offset 0=45408717 Failed to start consumer: unable to find offset parameter for partition 3: [0=45408717]
Seems like error handling on L251 is triggering for partition 3 which isn't even being consumed from.
Same problem here with offset parameter when using Kubernetes. In local execution runs perfect.
Hey,
I am sorry, but we have only limited resources at the moment and I cannot predict when we will have time to analyse the problem. If you can come up with a PR, that would speed things up :)
Regard