kminion icon indicating copy to clipboard operation
kminion copied to clipboard

Infinite loop: kminion doesn't expect empty consumer offset

Open mikekamornikov opened this issue 4 years ago • 1 comments

We use the following settings for __consumer_offsets:

    cleanup.policy:    compact,delete
    compression.type:  producer
    retention.ms:      2592000000
    segment.ms:        3600000

As you see it's possible that __consumer_offsets had records before and doesn't have them right now. I means that Low Water Mark = High Water Mark != 0 1

offset_consumer.go checkIfConsumerLagIsCaughtUp internally does the following:

highMarksRes, err := offsetReq.RequestWith(ctx, s.kafkaSvc.Client)
...
consumedOffsets := s.storage.getConsumedOffsets()
...
			highWaterMark := partition.Offset - 1
			consumedOffset := consumedOffsets[partition.Partition]
			partitionLag := highWaterMark - consumedOffset
			if partitionLag < 0 {
				partitionLag = 0
			}

			if partitionLag > 0 {
				partitionsLagging++
				totalLag += partitionLag
				s.logger.Debug("consumer_offsets topic lag has not been caught up yet",
					zap.Int32("partition_id", partition.Partition),
					zap.Int64("high_water_mark", highWaterMark),
					zap.Int64("consumed_offset", consumedOffset),
					zap.Int64("partition_lag", partitionLag))
				isReady = false
				continue
			}

partitionLag in this case is always greater than 0 isReady is always false.

It looks like there should be a check for default value for consumerOffset (0) somewhere before final condition.

mikekamornikov avatar Aug 02 '21 13:08 mikekamornikov

In case the __consumer_offsets topic is incomplete (due to configured retention settings) you should not use this mode as well, because it would yield wrong results. It has no chance of knowing the current group offset unless it would use the Kafka API again. This would unblock the start, but it would emit wrong metrics, or am I misunderstanding something?

weeco avatar Feb 10 '24 18:02 weeco