kafka-go
kafka-go copied to clipboard
Do not commit offsets for past generations if partition not owned
This is a skeleton PR aiming for discussion related to https://github.com/segmentio/kafka-go/issues/1308
In that issue, I describe how rebalances can lead to a situation where the consumer receiving the partition does not read messages in spite of lag existing (because conn's offset is > than the broker's current offset and there's no new messages coming).
I was aiming to fix it by avoiding commits for past generations, to do so, I add the generation id to the message and when committing, I log an error instead of adding it to the stash if it belongs to a generation < than current.
This has the risk of losing valid commits when the new generation comes (as its associated generation < latest generation.id), however it works because a new generation ends up creating new connections which means uncommitted events are duplicated. It is not that I love it because it works incidentally but it does work as my test shows.
If you have a different / better approach I am happy to explore it if you do explain it a bit.