watermill
watermill copied to clipboard
Invalid publisher name in metrics
When any publisher is used inside middleware of a message.Router and a handler is registered with router.AddNoPublisherHandler, the publisher_name label in metrics is incorrect.
Expected
Run code at the bottom of this message. It should produce metric where publisher_name="main.NoOpPublisher".
publish_time_seconds_bucket{handler_name="metrics-example",publisher_name="main.NoOpPublisher",success="true",le="0.01"} 20
Actual
Instead it produce metrics with publisher_name="message.disabledPublisher".
publish_time_seconds_bucket{handler_name="metrics-example",publisher_name="message.disabledPublisher",success="true",le="0.01"} 20
Cause
A publisher name is correctly set by metricsBuiled. When a handler is registered via AddNoPublisherHandler(), message.disabledPublisher is used, and it's name is written to a context. The PublisherPrometheusMetricsDecorator tries to obtain from the name from the context first and only uses its own name if it's not present, resulting in the use of message.disabledPublisher, which is incorrect.
Solution
The obvious fix is to change the priority and use the decorated publisher name and, if it's empty, use the name from the context. However, it should never be empty, so it's puzzling why it's necessary to get the name from the context at all. Therefore, it's seems that the fix is not that simple.
Can I simply push a fix to use the name from m.publisherName or it will be incorrect and problem should be approached differently?
Code to reproduce
package main
import (
"context"
"errors"
"os"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/components/metrics"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
"github.com/prometheus/client_golang/prometheus"
promDTO "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)
func main() {
ctx, _ := context.WithTimeout(context.Background(), 1*time.Second)
logger := watermill.NewStdLogger(true, false)
router, _ := message.NewRouter(message.RouterConfig{}, logger)
registry := prometheus.NewRegistry()
metricsBuilder := metrics.NewPrometheusMetricsBuilder(registry, "", "")
metricsBuilder.AddPrometheusRouterMetrics(router)
pub := NoOpPublisher{}
pubWithMetrics, _ := metricsBuilder.DecoratePublisher(pub)
poisonMiddleware, _ := middleware.PoisonQueueWithFilter(pubWithMetrics, "topic", allErrToPoison)
router.AddMiddleware(
poisonMiddleware,
)
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
router.AddNoPublisherHandler(
"metrics-example",
"sub_topic",
pubSub,
emptyHandler,
)
go router.Run(ctx)
<-router.Running()
for range 20 {
pubSub.Publish("sub_topic", message.NewMessage(watermill.NewUUID(), nil))
}
<-ctx.Done()
printMetrics(registry.Gather())
}
func emptyHandler(*message.Message) error {
return errors.New("some error")
}
func allErrToPoison(error) bool {
return true
}
type NoOpPublisher struct{}
func (m NoOpPublisher) Publish(topic string, messages ...*message.Message) error {
return nil
}
func (m NoOpPublisher) Close() error {
return nil
}
func printMetrics(metrics []*promDTO.MetricFamily, err error) {
if err != nil {
panic(err)
}
for _, metric := range metrics {
_, _ = expfmt.MetricFamilyToText(os.Stdout, metric)
}
}