kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

Let users integrate with metric lib

Open hadrienk opened this issue 3 years ago • 4 comments

Integrating the DurationStats and SummaryStats with prometheus is kind of difficult using the https://github.com/segmentio/stats library. Have you considered letting the user handle the integration manually? Something such as:


type Observable interface {
	Observe(float64)
}

type ObservableFunc func(float642 float64)

func (o ObservableFunc) Observe(value float64) {
	o(value)
}

type MetricObservers struct {
	Dials                     Observable
	Fetches                   Observable
	Messages                  Observable
	Bytes                     Observable
	Rebalances                Observable
	Timeouts                  Observable
	Errors                    Observable
	DialTime                  Observable
	ReadTime                  Observable
	WaitTime                  Observable
	FetchSize                 Observable
	FetchBytes                Observable
	Offset                    Observable
	Lag                       Observable
	MinBytes                  Observable
	MaxBytes                  Observable
	MaxWait                   Observable
	QueueLength               Observable
	QueueCapacity             Observable
	ClientID                  Observable
	Topic                     Observable
	Partition                 Observable
	DeprecatedFetchesWithTypo Observable
}

func Init() MetricObservers {
	dials := prometheus.NewGauge(prometheus.GaugeOpts{
		Name: "kafka_reader_dial_count",
		Help: "Count of dials",
		ConstLabels: prometheus.Labels{
			"client_id": "1234",
			"topic":     "foo",
			"partition": "0",
		},
	})

	return MetricObservers{
		Dials: ObservableFunc(dials.Set),
	}
}

hadrienk avatar Feb 01 '22 13:02 hadrienk

The segmentio/stats library includes a prometheus integration, so are you describing a limitation in that integration you are trying to work around? (in which case, a PR to segmentio/stats might be the next step)

Or are you trying to propose a more generic mechanism for kafka-go to export metrics? The metrics are available as plain structs, so you should be able to integrate with other systems as well.

This proposed Observable interface seems to add a lot of complexity that would make the system less interoperable overall, but maybe I'm just having trouble grokking such a hypothetical.

dominicbarnes avatar Feb 04 '22 07:02 dominicbarnes

The segmentio/stats library includes a prometheus integration, so are you describing a limitation in that integration you are trying to work around? (in which case, a PR to segmentio/stats might be the next step)

Partly, the stats integration takes over the metrics handler. There's no easy way (or non-hacky way) to use both the promhttp.Handler() (from the official lib) and the prometheus.DefaultHandler.

Or are you trying to propose a more generic mechanism for kafka-go to export metrics? The metrics are available as plain structs, so you should be able to integrate with other systems as well.

I am calling the Stats() method of the Reader to forward the values to prometheus yes. This works okay for the counters and gauges (although the gauge values are reset every time they are read). I was hoping to build histograms for the duration/size metrics but the struct only gives me the avg/min/max. Using interfaces would let me decide how I analyse and process the observations.

This proposed Observable interface seems to add a lot of complexity that would make the system less interoperable overall, but maybe I'm just having trouble grokking such a hypothetical.

It's a big change indeed. I've used the go-kit abstraction before and it uses that kind of abstraction but the segmentio/stats takes quite a different approach.

Here's what I struggle with:

  1. I want to be able to measure time/size as histograms.
  2. I need to be able to continue using vanilla prometheus reporter (http handler).
  3. The ~counters~ gauges are reset everytime I look at them

hadrienk avatar Feb 04 '22 17:02 hadrienk

For context, here's what I ended up with:

type ReaderCollector struct {
	Reader *kafka.Reader
}

var (
	labels = []string{"client_id", "topic", "partition"}

	dialsDescription      = prometheus.NewCounterVec(prometheus.CounterOpts{Name: "kafka_reader_dial", Help: "Count the dials"}, labels)
	fetchesDescription    = prometheus.NewCounterVec(prometheus.CounterOpts{Name: "kafka_reader_fetch", Help: "Count the fetches"}, labels)
	messagesDescription   = prometheus.NewCounterVec(prometheus.CounterOpts{Name: "kafka_reader_message", Help: "Amount of read messages"}, labels)
	bytesDescription      = prometheus.NewCounterVec(prometheus.CounterOpts{Name: "kafka_reader_message_bytes", Help: "Bytes read by the reader"}, labels)
	rebalancedDescription = prometheus.NewCounterVec(prometheus.CounterOpts{Name: "kafka_reader_rebalanced", Help: "How many time the reader got rebalanced"}, labels)
	timeoutsDescription   = prometheus.NewCounterVec(prometheus.CounterOpts{Name: "kafka_reader_timeout", Help: "How many timeouts"}, labels)
	errorsDescription     = prometheus.NewCounterVec(prometheus.CounterOpts{Name: "kafka_reader_error", Help: "How many errors"}, labels)

	// Ideally these should be histograms.
	// TODO: reevaluate https://github.com/segmentio/kafka-go/issues/837
	dialTimeDescriptionAvg   = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_dial_seconds_avg"}, labels)
	readTimeDescriptionAvg   = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_read_seconds_avg"}, labels)
	waitTimeDescriptionAvg   = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_wait_seconds_avg"}, labels)
	fetchSizeDescriptionAvg  = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_fetch_size_avg"}, labels)
	fetchBytesDescriptionAvg = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_fetch_bytes_avg"}, labels)

	// Gauges
	offsetDescription   = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_offset", Help: "Reader offset"}, labels)
	lagDescription      = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_lag", Help: "Lag of the reader"}, labels)
	minBytesDescription = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_config_fetch_bytes_min", Help: "Min byte config value"}, labels)
	maxBytesDescription = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_config_fetch_bytes_max", Help: "Max byte config value"}, labels)
	maxWaitDescription       = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_fetch_wait_max", Help: "TODO"}, labels)
	queueLengthDescription   = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_queue_length", Help: "Length of the internal memory queue"}, labels)
	queueCapacityDescription = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "kafka_reader_queue_capacity", Help: "Capacity of the internal memory queue"}, labels)
)

func (r *ReaderCollector) Describe(_ chan<- *prometheus.Desc) {

}

func counter(counter *prometheus.CounterVec, value float64, id string, topic string, partition string) prometheus.Counter {
	counterWithLabel := counter.WithLabelValues(id, topic, partition)
	counterWithLabel.Add(value)
	return counterWithLabel
}

func gauge(gauge *prometheus.GaugeVec, value float64, id string, topic string, partition string) prometheus.Metric {
	gaugeWithLabel := gauge.WithLabelValues(id, topic, partition)
	if value != 0 {
		gaugeWithLabel.Set(value)
	}
	return gaugeWithLabel
}

func (r *ReaderCollector) Collect(metrics chan<- prometheus.Metric) {
	stats := r.Reader.Stats()
	clientId, topic, partition := stats.ClientID, stats.Topic, stats.Partition
	metrics <- counter(dialsDescription, float64(stats.Dials), clientId, topic, partition)
	metrics <- counter(fetchesDescription, float64(stats.Fetches), clientId, topic, partition)
	metrics <- counter(messagesDescription, float64(stats.Messages), clientId, topic, partition)
	metrics <- counter(bytesDescription, float64(stats.Bytes), clientId, topic, partition)
	metrics <- counter(rebalancedDescription, float64(stats.Rebalances), clientId, topic, partition)
	metrics <- counter(timeoutsDescription, float64(stats.Timeouts), clientId, topic, partition)
	metrics <- counter(errorsDescription, float64(stats.Errors), clientId, topic, partition)
	metrics <- gauge(dialTimeDescriptionAvg, stats.DialTime.Avg.Seconds(), clientId, topic, partition)
	metrics <- gauge(readTimeDescriptionAvg, stats.ReadTime.Avg.Seconds(), clientId, topic, partition)
	metrics <- gauge(waitTimeDescriptionAvg, stats.WaitTime.Avg.Seconds(), clientId, topic, partition)
	metrics <- gauge(fetchSizeDescriptionAvg, float64(stats.FetchSize.Avg), clientId, topic, partition)
	metrics <- gauge(fetchBytesDescriptionAvg, float64(stats.FetchBytes.Avg), clientId, topic, partition)
	metrics <- gauge(offsetDescription, float64(stats.Offset), clientId, topic, partition)
	metrics <- gauge(lagDescription, float64(stats.Lag), clientId, topic, partition)
	metrics <- gauge(minBytesDescription, float64(stats.MinBytes), clientId, topic, partition)
	metrics <- gauge(maxBytesDescription, float64(stats.MaxBytes), clientId, topic, partition)
	metrics <- gauge(maxWaitDescription, float64(stats.MaxWait), clientId, topic, partition)
	metrics <- gauge(queueLengthDescription, float64(stats.QueueLength), clientId, topic, partition)
	metrics <- gauge(queueCapacityDescription, float64(stats.QueueCapacity), clientId, topic, partition)
}

hadrienk avatar Feb 04 '22 17:02 hadrienk