kafka-python icon indicating copy to clipboard operation
kafka-python copied to clipboard

Prevent Fetcher from wrongfully discarding PartitionRecords in compacted topics

Open flavray opened this issue 8 years ago • 1 comments

When a topic is compacted, consecutive messages might not have consecutive offsets. Fetcher._append works would discard PartitionRecords whenever the offset of first message of the part was not equal to the offset of the last message of the previous part + 1, this is almost never the case for compacted topics (at least when fetching from the 'earliest' offset). By using part.fetch_offset instead, we ensure the whole PartitionRecords is not discarded the first time offsets are not consecutive, avoiding sending "useless" new FetchRequests.

In our case, the first FetchResponse returned ~13,000 records, using consumer.poll(max_records=50), ~12,950 were discarded because the offset of the 51st message was not equal to the offset of the 50th message + 1 and a new FetchRequest was sent, and so on.... With this change, the whole ~13,000 messages were correctly used only one FetchRequest had to be sent. (The topic was __consumer_offsets which is compacted).

flavray avatar Jun 23 '17 09:06 flavray

Good catch! Can you make this pr upstream?

ecanzonieri avatar Jun 23 '17 18:06 ecanzonieri