sarama icon indicating copy to clipboard operation
sarama copied to clipboard

fix: stuck on the batch with zero records with long gap

Open Sergey-Belyakov opened this issue 8 months ago • 1 comments

Hi! We found a bug. If a repeated fetch with an increased offset has returned the void again, then fetching is constantly trying to request a batch with the same offset. It is always necessary to increase the offset in such cases.

Sergey-Belyakov avatar Apr 18 '25 11:04 Sergey-Belyakov

@Sergey-Belyakov thanks for the PR, this is interesting!

Will take a look to see what the Java client behaviour is in this scenario to ensure we align

dnwe avatar May 13 '25 09:05 dnwe

This bug has hit us multiple times in production - can it receive a review? (PR works for us when we apply it locally)

kgatv2 avatar Jul 04 '25 12:07 kgatv2

@kgatv2 what is your backend cluster? We just need to be clear why we think the FetchResponse that isn't end-of-topic, has no error and no records – to ensure we're not just papering over some corruption in a third-party broker implementation and it is a genuine Apache Kafka broker behaviour (perhaps due to aborted transactions?) that causes these responses.

I guess I'm nervous about tossing in offset++ without understanding why. Corroborating links to other client implementations of the behaviour in apache/kafka, confluentinc/librdkafka, twmb/franz-go would also help give weight to this PR too

dnwe avatar Jul 07 '25 08:07 dnwe

We are hosting our kafka topics on Aiven - https://aiven.io/kafka. Using 4 partitions for the topic that got "corrupted".

kgatv2 avatar Jul 07 '25 08:07 kgatv2

Hi, sorry for my English. I deleted my previous comment because I wasn't completely sure, but now I think I'm sure of what I'm saying.

It looks like we've encountered issue KAFKA-5443

I think the problem is how exactly LastRecordsBatchOffset is defined.

Let's see how it is defined: https://github.com/IBM/sarama/blob/aa1a2c5ee2508e6a6032b445d99518defae32c1c/fetch_response.go#L153

Now let's on recordsOffset function

https://github.com/IBM/sarama/blob/aa1a2c5ee2508e6a6032b445d99518defae32c1c/records.go#L186-L199

That is, in LastRecordsBatchOffset we store first offset of the records batch. But when requesting, in order not to hang, we should request the last offset of the records batch + 1, that is FirstOffset + LastOffsetDelta + 1

This is exactly the implementation in other libraries

Let's see how it's done in the franz-go library:

https://github.com/twmb/franz-go/blob/669b18eeee83408f04b848f0a5069dcd86153413/pkg/kgo/source.go#L1586

https://github.com/twmb/franz-go/blob/669b18eeee83408f04b848f0a5069dcd86153413/pkg/kgo/source.go#L1643-L1658

And now let see how it's done in the Java library:

https://github.com/apache/kafka/blob/4b607616c764e7654fac200cdd260e74ef29fca3/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java#L194

https://github.com/a0x8o/kafka/blob/54eff6af115ee647f60129f2ce6a044cb17215d0/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java#L127-L131

I think the recordsOffset function should look something like this:

func (r *Records) recordsOffset() (*int64, error) {
	switch r.recordsType {
	case unknownRecords:
		return nil, nil
	case legacyRecords:
		return nil, nil
	case defaultRecords:
		if r.RecordBatch == nil {
			return nil, nil
		}
		lastOffset := r.RecordBatch.LastOffset()
		return &lastOffset, nil
	}
	return nil, fmt.Errorf("unknown records type: %v", r.recordsType)
}

I just got this bug on our cluster.

Screenshot 2025-07-12 at 10 44 52

FirstOffset = 1273344 LastOffsetDelta = 96

We keep sending the request with LastRecordsBatchOffset + 1 = 1273345 over and over, and we keep getting the same result every time. But if we modify the recordsOffset function the way I described above, we no longer get stuck at that part of the code.

@dnwe What do you think about this?

sterligov avatar Jul 12 '25 06:07 sterligov

@sterligov thanks this is good analysis. I think you’re probably right and the original fix attempted under https://github.com/IBM/sarama/pull/2057 was put in the wrong place and should have used your delta increment rather than a +1

Are you happy to draft a PR up with your proposed fix (removing the old #2057 one) so we can run it through the FV?

dnwe avatar Jul 12 '25 10:07 dnwe

@dnwe Hi, I tried to solve this problem in PR 3221. Please pay attention to it when you have time.

sterligov avatar Jul 20 '25 12:07 sterligov

closing in favour of merged https://github.com/IBM/sarama/pull/3221 – thanks for raising the issue!

dnwe avatar Jul 31 '25 19:07 dnwe