kafka-go
kafka-go copied to clipboard
Let users integrate with metric lib
Integrating the DurationStats and SummaryStats with prometheus is kind of difficult using the https://github.com/segmentio/stats library. Have you considered letting the user handle the integration manually? Something such as:
type Observable interface {
Observe(float64)
}
type ObservableFunc func(float642 float64)
func (o ObservableFunc) Observe(value float64) {
o(value)
}
type MetricObservers struct {
Dials Observable
Fetches Observable
Messages Observable
Bytes Observable
Rebalances Observable
Timeouts Observable
Errors Observable
DialTime Observable
ReadTime Observable
WaitTime Observable
FetchSize Observable
FetchBytes Observable
Offset Observable
Lag Observable
MinBytes Observable
MaxBytes Observable
MaxWait Observable
QueueLength Observable
QueueCapacity Observable
ClientID Observable
Topic Observable
Partition Observable
DeprecatedFetchesWithTypo Observable
}
func Init() MetricObservers {
dials := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "kafka_reader_dial_count",
Help: "Count of dials",
ConstLabels: prometheus.Labels{
"client_id": "1234",
"topic": "foo",
"partition": "0",
},
})
return MetricObservers{
Dials: ObservableFunc(dials.Set),
}
}
The segmentio/stats library includes a prometheus integration, so are you describing a limitation in that integration you are trying to work around? (in which case, a PR to segmentio/stats might be the next step)
Or are you trying to propose a more generic mechanism for kafka-go to export metrics? The metrics are available as plain structs, so you should be able to integrate with other systems as well.
This proposed Observable
interface seems to add a lot of complexity that would make the system less interoperable overall, but maybe I'm just having trouble grokking such a hypothetical.
The segmentio/stats library includes a prometheus integration, so are you describing a limitation in that integration you are trying to work around? (in which case, a PR to segmentio/stats might be the next step)
Partly, the stats integration takes over the metrics handler. There's no easy way (or non-hacky way) to use both the promhttp.Handler()
(from the official lib) and the prometheus.DefaultHandler
.
Or are you trying to propose a more generic mechanism for kafka-go to export metrics? The metrics are available as plain structs, so you should be able to integrate with other systems as well.
I am calling the Stats() method of the Reader to forward the values to prometheus yes. This works okay for the counters and gauges (although the gauge values are reset every time they are read). I was hoping to build histograms for the duration/size metrics but the struct only gives me the avg/min/max. Using interfaces would let me decide how I analyse and process the observations.
This proposed
Observable
interface seems to add a lot of complexity that would make the system less interoperable overall, but maybe I'm just having trouble grokking such a hypothetical.
It's a big change indeed. I've used the go-kit abstraction before and it uses that kind of abstraction but the segmentio/stats takes quite a different approach.
Here's what I struggle with:
- I want to be able to measure time/size as histograms.
- I need to be able to continue using vanilla prometheus reporter (http handler).
- The ~counters~ gauges are reset everytime I look at them
For context, here's what I ended up with:
type ReaderCollector struct {
Reader *kafka.Reader
}
var (
labels = []string{"client_id", "topic", "partition"}
dialsDescription = prometheus.NewCounterVec(prometheus.CounterOpts{Name: "kafka_reader_dial", Help: "Count the dials"}, labels)
fetchesDescription = prometheus.NewCounterVec(prometheus.CounterOpts{Name: "kafka_reader_fetch", Help: "Count the fetches"}, labels)
messagesDescription = prometheus.NewCounterVec(prometheus.CounterOpts{Name: "kafka_reader_message", Help: "Amount of read messages"}, labels)
bytesDescription = prometheus.NewCounterVec(prometheus.CounterOpts{Name: "kafka_reader_message_bytes", Help: "Bytes read by the reader"}, labels)
rebalancedDescription = prometheus.NewCounterVec(prometheus.CounterOpts{Name: "kafka_reader_rebalanced", Help: "How many time the reader got rebalanced"}, labels)
timeoutsDescription = prometheus.NewCounterVec(prometheus.CounterOpts{Name: "kafka_reader_timeout", Help: "How many timeouts"}, labels)
errorsDescription = prometheus.NewCounterVec(prometheus.CounterOpts{Name: "kafka_reader_error", Help: "How many errors"}, labels)
// Ideally these should be histograms.
// TODO: reevaluate https://github.com/segmentio/kafka-go/issues/837
dialTimeDescriptionAvg = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_dial_seconds_avg"}, labels)
readTimeDescriptionAvg = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_read_seconds_avg"}, labels)
waitTimeDescriptionAvg = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_wait_seconds_avg"}, labels)
fetchSizeDescriptionAvg = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_fetch_size_avg"}, labels)
fetchBytesDescriptionAvg = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_fetch_bytes_avg"}, labels)
// Gauges
offsetDescription = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_offset", Help: "Reader offset"}, labels)
lagDescription = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_lag", Help: "Lag of the reader"}, labels)
minBytesDescription = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_config_fetch_bytes_min", Help: "Min byte config value"}, labels)
maxBytesDescription = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_config_fetch_bytes_max", Help: "Max byte config value"}, labels)
maxWaitDescription = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_fetch_wait_max", Help: "TODO"}, labels)
queueLengthDescription = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_queue_length", Help: "Length of the internal memory queue"}, labels)
queueCapacityDescription = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_queue_capacity", Help: "Capacity of the internal memory queue"}, labels)
)
func (r *ReaderCollector) Describe(_ chan<- *prometheus.Desc) {
}
func counter(counter *prometheus.CounterVec, value float64, id string, topic string, partition string) prometheus.Counter {
counterWithLabel := counter.WithLabelValues(id, topic, partition)
counterWithLabel.Add(value)
return counterWithLabel
}
func gauge(gauge *prometheus.GaugeVec, value float64, id string, topic string, partition string) prometheus.Metric {
gaugeWithLabel := gauge.WithLabelValues(id, topic, partition)
if value != 0 {
gaugeWithLabel.Set(value)
}
return gaugeWithLabel
}
func (r *ReaderCollector) Collect(metrics chan<- prometheus.Metric) {
stats := r.Reader.Stats()
clientId, topic, partition := stats.ClientID, stats.Topic, stats.Partition
metrics <- counter(dialsDescription, float64(stats.Dials), clientId, topic, partition)
metrics <- counter(fetchesDescription, float64(stats.Fetches), clientId, topic, partition)
metrics <- counter(messagesDescription, float64(stats.Messages), clientId, topic, partition)
metrics <- counter(bytesDescription, float64(stats.Bytes), clientId, topic, partition)
metrics <- counter(rebalancedDescription, float64(stats.Rebalances), clientId, topic, partition)
metrics <- counter(timeoutsDescription, float64(stats.Timeouts), clientId, topic, partition)
metrics <- counter(errorsDescription, float64(stats.Errors), clientId, topic, partition)
metrics <- gauge(dialTimeDescriptionAvg, stats.DialTime.Avg.Seconds(), clientId, topic, partition)
metrics <- gauge(readTimeDescriptionAvg, stats.ReadTime.Avg.Seconds(), clientId, topic, partition)
metrics <- gauge(waitTimeDescriptionAvg, stats.WaitTime.Avg.Seconds(), clientId, topic, partition)
metrics <- gauge(fetchSizeDescriptionAvg, float64(stats.FetchSize.Avg), clientId, topic, partition)
metrics <- gauge(fetchBytesDescriptionAvg, float64(stats.FetchBytes.Avg), clientId, topic, partition)
metrics <- gauge(offsetDescription, float64(stats.Offset), clientId, topic, partition)
metrics <- gauge(lagDescription, float64(stats.Lag), clientId, topic, partition)
metrics <- gauge(minBytesDescription, float64(stats.MinBytes), clientId, topic, partition)
metrics <- gauge(maxBytesDescription, float64(stats.MaxBytes), clientId, topic, partition)
metrics <- gauge(maxWaitDescription, float64(stats.MaxWait), clientId, topic, partition)
metrics <- gauge(queueLengthDescription, float64(stats.QueueLength), clientId, topic, partition)
metrics <- gauge(queueCapacityDescription, float64(stats.QueueCapacity), clientId, topic, partition)
}