kafka-go
kafka-go copied to clipboard
Reader without consumer group read at last offset
Describe the solution you would like
Currently the implementation when spinning a consumer without consumer group will always start at the first offset: https://github.com/segmentio/kafka-go/blob/a558bb8629a8f5a920a8e1bb640064ad7efb7cd8/reader.go#L707
Would it be possible to support the StartOffset parameter instead?
The goal in here is to spin a consumer without a consumer group idea to get only the latest offsets
There is a way to work around to see if it fits your need. Here is how we did it to read from any offset:
- Get the first and last offsets on a topic/partition:
...
conn, err = dialer.DialLeader(ctx, "tcp", broker, k.Topic, k.Partition) // we use kafka.DefaultDialer
if err != nil {
// should retry, or another broker
}
first, last, err = conn.ReadOffsets()
...
- use the above offsets to set your desired starting offset
k.Offsetbetweenfirstandlast(inclusive):
k.client = kafka.NewReader(config)
_ = k.client.SetOffset(k.Offset)
...
m, err := k.client.ReadMessage(k.context)
Yeah this seems reasonable and was probably a mistake in the initial implementation, can you submit a PR with the change?
I'll try to find an implementation to avoid breaking changes in the initial behavior.