aiokafka
aiokafka copied to clipboard
seek_to_committed does not work when committed offset is 0
Hi,
The seek to committed function has a small bug. It does not work when the last committed offset of a TopicPartition is 0 (i.e., nothing was ever committed). This happens if you getmany() or getone() message from a never-committed-to TopicPartition with auto commit disabled. Lets say the messages couldn't be processed and you want to seek_to_committed which is 0 in this case, it doesn't work.
Snippet of code from the seek_to_committed method:
if offset and offset > 0:
self._fetcher.seek_to(tp, offset)
return committed_offsets
I believe the if statement above should be if offset and offset >= 0: which is valid.
To work around this, I had to do the following, which works as expected:
current_committed_offset = await consumer.committed(tp)
consumer.seek(tp, current_committed_offset)
Thanks