Prevent Fetcher from wrongfully discarding PartitionRecords in compacted topics
When a topic is compacted, consecutive messages might not have
consecutive offsets. Fetcher._append works would discard
PartitionRecords whenever the offset of first message of the part was
not equal to the offset of the last message of the previous part + 1,
this is almost never the case for compacted topics (at least when
fetching from the 'earliest' offset).
By using part.fetch_offset instead, we ensure the whole
PartitionRecords is not discarded the first time offsets are not
consecutive, avoiding sending "useless" new FetchRequests.
In our case, the first FetchResponse returned ~13,000 records, using
consumer.poll(max_records=50), ~12,950 were discarded because the
offset of the 51st message was not equal to the offset of the 50th
message + 1 and a new FetchRequest was sent, and so on.... With this
change, the whole ~13,000 messages were correctly used only one
FetchRequest had to be sent.
(The topic was __consumer_offsets which is compacted).
Good catch! Can you make this pr upstream?