kafka-go
kafka-go copied to clipboard
Expose the calculated lag to Readers configured with a GroupID
We have a use case where seeing the lag for a given topic/partition is almost critical. kafka-go already tracks this internally, and it would be good to be able to read this value in the context of a Consumer Group. This change works fine here, but I'd rather not continue using a fork.
Worst-case, if there are fatal flaws with exposing this, or perhaps non-fatal caveats, they could be added to the README to good effect.
Thanks!
Thanks for the pull request @wekb!
The reason why we did not report this value when the reader is configured to join a consumer group is that the value was inaccurate. With a consumer group the reader may be handling multiple partitions and therefore just a single lag value is not representative and hard to make sense of if only 1 out of N partitions owned by a reader is lagging.
Is there something special about your setup that makes it possible to use this value for lag? (for example, do you have as many consumers as partitions?)
Our use case in this scenario is guaranteed to have one partition—we need strict ordering, and the overall throughput requirements are modest. In a hypothetical multiple-partition case, if the single lag value was always the highest lag value for a given partition, it would still be useful in our case. Otherwise yeah, at best it's misleading.
We also noticed the recent ConsumerGroups work, but I haven't had a chance yet to see if it exposes the lag (or at least the most recent offset from the topic, which I'd be happy to calculate separately).
Thanks for the feedback!
Does ConsumerGroups provide more granular lag reporting, or would this change be useful with a documented caveat?
The ConsumerGroup API doesn't support this yet, but we are thinking that we could potentially add a helper function to compute the lag of a reader or consumer group, by reading the __consumer_offset topic for example.
Does that sound like it would address your use case?
Yes. The forked version of kafka-go that we use has this PR applied and it's been working well. Obviously being able to grab the topic lag on a per-partition basis is useful, but for single-partition use cases like ours, lag for the single partition is enough.
@wekb How does it report lag for you? for me it reports 0 always.
Hmm... I think Lag reporting is not accurate, even for a single partitition. For me it's okay, because I can retry multiple times. But for note.
// Http handler
func serve(w http.ResponseWriter, r *http.Request) {
// Channel to keep track of offset lag
lagChannel := make(chan int64, 1)
// Creating context
ctx := r.Context()
// Reading topic
go func(ctx context.Context, r *kafka.Reader) {
for {
m, err := r.ReadMessage(ctx)
if err != nil {
fmt.Printf("err reading message: %v", err)
break
}
fmt.Printf("topic: %q partition: %v offset: %v lag: %d\n ", m.Topic, m.Partition, m.Offset, r.Lag())
go metricsCreation(m.Value)
fmt.Println(r.Lag())
lagChannel <- r.Lag()
}
}(ctx, kafkaReader)
for {
select {
case message := <-promMetricsChannel:
fmt.Fprintf(w, "%s\n", message)
// Waiting for 1 ms before quitting
case <-ctx.Done():
fmt.Printf("done")
return
case lag := <-lagChannel:
if lag == 0 {
return
}
}
}
}