kop
kop copied to clipboard
[BUG] KoP will consume duplicate message when send multi batches in one produce request for one TopicPartition
Describe the Bug
When producer send multi batches in one MemoryRecords, such as protocal [V0, V2] and [V8, +), it will packup multi batches in one entry and stored in managedLeger.
However, the offset expose to consumer is in batch granularity. Consumer fetch message accroding to offset, and it will be converted to MessageId in KoP. As the following picture, one entry contains for batches

When consumer fetch from offset 6, offset 6 will be converted to entry 1, and KoP will fetch entry 1 from BookKeeper and return fetched entry to consumer without filtering out message of offset 4 and offset 5. It will lead to consumer fetch duplicated messages.
Another affected area is getting LogStartOffset (LSO). https://github.com/streamnative/kop/pull/531
We are planing to calculate LSO by
LSO = LEO - numberOfEntries
LSO and numberOfEntries can get from managedLedger.
However, this formula is based on one entry contains one batch.
If one entry contains multi batches, we should use another variable to mark LSO in managedLedger.
Just a term correction. In Kafka, LSO is usually short for Last Stable Offset. For log started offsets, there seems to be no short name.
To solve this problem, I think we need to upgrade the Kafka dependency to 2.7.0 or 2.8.0 first. And if we want to handle the case that a single entry contains multiple batches, we can add a key-value to the entry's metadata.