kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

Reader without consumer group read at last offset

Open zekth opened this issue 9 months ago • 3 comments

Describe the solution you would like

Currently the implementation when spinning a consumer without consumer group will always start at the first offset: https://github.com/segmentio/kafka-go/blob/a558bb8629a8f5a920a8e1bb640064ad7efb7cd8/reader.go#L707

Would it be possible to support the StartOffset parameter instead?

The goal in here is to spin a consumer without a consumer group idea to get only the latest offsets

zekth avatar Feb 14 '25 16:02 zekth

There is a way to work around to see if it fits your need. Here is how we did it to read from any offset:

  1. Get the first and last offsets on a topic/partition:
    ...
    conn, err = dialer.DialLeader(ctx, "tcp", broker, k.Topic, k.Partition) // we use kafka.DefaultDialer
    if err != nil {
         // should retry, or another broker
    }
    first, last, err = conn.ReadOffsets()
   ...
  1. use the above offsets to set your desired starting offset k.Offset between first and last (inclusive):
        k.client = kafka.NewReader(config)
	_ = k.client.SetOffset(k.Offset)
        ...
        m, err := k.client.ReadMessage(k.context)

ns-gzhang avatar Feb 21 '25 01:02 ns-gzhang

Yeah this seems reasonable and was probably a mistake in the initial implementation, can you submit a PR with the change?

petedannemann avatar Mar 21 '25 20:03 petedannemann

I'll try to find an implementation to avoid breaking changes in the initial behavior.

zekth avatar Mar 21 '25 22:03 zekth