watermill icon indicating copy to clipboard operation
watermill copied to clipboard

Invalid publisher name in metrics

Open fetinin opened this issue 1 year ago • 0 comments
trafficstars

When any publisher is used inside middleware of a message.Router and a handler is registered with router.AddNoPublisherHandler, the publisher_name label in metrics is incorrect.

Expected

Run code at the bottom of this message. It should produce metric where publisher_name="main.NoOpPublisher".

publish_time_seconds_bucket{handler_name="metrics-example",publisher_name="main.NoOpPublisher",success="true",le="0.01"} 20

Actual

Instead it produce metrics with publisher_name="message.disabledPublisher".

publish_time_seconds_bucket{handler_name="metrics-example",publisher_name="message.disabledPublisher",success="true",le="0.01"} 20

Cause

A publisher name is correctly set by metricsBuiled. When a handler is registered via AddNoPublisherHandler(), message.disabledPublisher is used, and it's name is written to a context. The PublisherPrometheusMetricsDecorator tries to obtain from the name from the context first and only uses its own name if it's not present, resulting in the use of message.disabledPublisher, which is incorrect.

Solution

The obvious fix is to change the priority and use the decorated publisher name and, if it's empty, use the name from the context. However, it should never be empty, so it's puzzling why it's necessary to get the name from the context at all. Therefore, it's seems that the fix is not that simple.

Can I simply push a fix to use the name from m.publisherName or it will be incorrect and problem should be approached differently?

Code to reproduce

package main

import (
	"context"
	"errors"
	"os"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/components/metrics"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
	"github.com/prometheus/client_golang/prometheus"
	promDTO "github.com/prometheus/client_model/go"
	"github.com/prometheus/common/expfmt"
)

func main() {
	ctx, _ := context.WithTimeout(context.Background(), 1*time.Second)
	logger := watermill.NewStdLogger(true, false)
	router, _ := message.NewRouter(message.RouterConfig{}, logger)

	registry := prometheus.NewRegistry()

	metricsBuilder := metrics.NewPrometheusMetricsBuilder(registry, "", "")
	metricsBuilder.AddPrometheusRouterMetrics(router)

	pub := NoOpPublisher{}
	pubWithMetrics, _ := metricsBuilder.DecoratePublisher(pub)

	poisonMiddleware, _ := middleware.PoisonQueueWithFilter(pubWithMetrics, "topic", allErrToPoison)
	router.AddMiddleware(
		poisonMiddleware,
	)
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
	router.AddNoPublisherHandler(
		"metrics-example",
		"sub_topic",
		pubSub,
		emptyHandler,
	)
	go router.Run(ctx)
	<-router.Running()

	for range 20 {
		pubSub.Publish("sub_topic", message.NewMessage(watermill.NewUUID(), nil))
	}
	<-ctx.Done()
	printMetrics(registry.Gather())
}

func emptyHandler(*message.Message) error {
	return errors.New("some error")
}

func allErrToPoison(error) bool {
	return true
}

type NoOpPublisher struct{}

func (m NoOpPublisher) Publish(topic string, messages ...*message.Message) error {
	return nil
}

func (m NoOpPublisher) Close() error {
	return nil
}

func printMetrics(metrics []*promDTO.MetricFamily, err error) {
	if err != nil {
		panic(err)
	}
	for _, metric := range metrics {
		_, _ = expfmt.MetricFamilyToText(os.Stdout, metric)
	}
}

fetinin avatar Apr 25 '24 07:04 fetinin