kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

Expose the calculated lag to Readers configured with a GroupID

Open wekb opened this issue 6 years ago • 6 comments
trafficstars

We have a use case where seeing the lag for a given topic/partition is almost critical. kafka-go already tracks this internally, and it would be good to be able to read this value in the context of a Consumer Group. This change works fine here, but I'd rather not continue using a fork.

Worst-case, if there are fatal flaws with exposing this, or perhaps non-fatal caveats, they could be added to the README to good effect.

Thanks!

wekb avatar Sep 04 '19 22:09 wekb

Thanks for the pull request @wekb!

The reason why we did not report this value when the reader is configured to join a consumer group is that the value was inaccurate. With a consumer group the reader may be handling multiple partitions and therefore just a single lag value is not representative and hard to make sense of if only 1 out of N partitions owned by a reader is lagging.

Is there something special about your setup that makes it possible to use this value for lag? (for example, do you have as many consumers as partitions?)

achille-roussel avatar Sep 09 '19 01:09 achille-roussel

Our use case in this scenario is guaranteed to have one partition—we need strict ordering, and the overall throughput requirements are modest. In a hypothetical multiple-partition case, if the single lag value was always the highest lag value for a given partition, it would still be useful in our case. Otherwise yeah, at best it's misleading.

We also noticed the recent ConsumerGroups work, but I haven't had a chance yet to see if it exposes the lag (or at least the most recent offset from the topic, which I'd be happy to calculate separately).

Thanks for the feedback!

wekb avatar Sep 09 '19 18:09 wekb

Does ConsumerGroups provide more granular lag reporting, or would this change be useful with a documented caveat?

wekb avatar Oct 16 '19 19:10 wekb

The ConsumerGroup API doesn't support this yet, but we are thinking that we could potentially add a helper function to compute the lag of a reader or consumer group, by reading the __consumer_offset topic for example.

Does that sound like it would address your use case?

achille-roussel avatar Feb 14 '20 22:02 achille-roussel

Yes. The forked version of kafka-go that we use has this PR applied and it's been working well. Obviously being able to grab the topic lag on a per-partition basis is useful, but for single-partition use cases like ours, lag for the single partition is enough.

wekb avatar Feb 14 '20 23:02 wekb

@wekb How does it report lag for you? for me it reports 0 always. Hmm... I think Lag reporting is not accurate, even for a single partitition. For me it's okay, because I can retry multiple times. But for note.

// Http handler
func serve(w http.ResponseWriter, r *http.Request) {
	// Channel to keep track of offset lag
	lagChannel := make(chan int64, 1)
        // Creating context
        ctx := r.Context()
	// Reading topic
	go func(ctx context.Context, r *kafka.Reader) {
		for {
			m, err := r.ReadMessage(ctx)
			if err != nil {
				fmt.Printf("err reading message: %v", err)
				break
			}
			fmt.Printf("topic: %q partition: %v offset: %v lag: %d\n ", m.Topic, m.Partition, m.Offset, r.Lag())
			go metricsCreation(m.Value)
			fmt.Println(r.Lag())
			lagChannel <- r.Lag()
		}
	}(ctx, kafkaReader)
	for {
		select {
		case message := <-promMetricsChannel:
			fmt.Fprintf(w, "%s\n", message)
			// Waiting for 1 ms before quitting
		case <-ctx.Done():
			fmt.Printf("done")
			return
		case lag := <-lagChannel:
			if lag == 0 {
				return
			}
		}
	}
}

rjshrjndrn avatar Feb 28 '20 03:02 rjshrjndrn