Feature: Prometheus Middleware to collect metrics
To improve Observability in FastStream, Prometheus Middleware support is needed.
Suggested metrics:
- number of messages received;
- ack quantity;
- nack quantity;
- reject quantity;
- number of unprocessed messages (unhandled errors);
- number of messages sent.
I'm guessing the usage would look something like this:
from prometheus_client import make_asgi_app, CollectorRegistry
from faststream.asgi import AsgiFastStream
from faststream.rabbit import RabbitBroker
from faststream.rabbit.prometheus import RabbitPrometheusMiddleware
registry = CollectorRegistry()
broker = RabbitBroker(middlewares=[RabbitPrometheusMiddleware(registry=registry)])
app = AsgiFastStream(
broker,
asyncapi_path="/docs",
asgi_routes=[
("/metrics", make_asgi_app(registry=registry))
]
)
Solution pseudocode:
from prometheus_client import CollectorRegistry, Counter
class RabbitPrometheusMiddleware(BasePrometheusMiddleware):
@staticmethod
def get_broker_name() -> str:
return "rabbit"
class BasePrometheusMiddleware:
def __init__(self, registry: CollectorRegistry):
self._registry = registry
self._received_messages_counter = Counter(
name="received_messages",
documentation="Received messages",
labelnames=["broker"],
)
self._processed_messages_with_error_counter = Counter(
name="processed_messages_with_error",
documentation="Processed messages with error",
labelnames=["broker"],
)
self._processed_messages_without_error_counter = Counter(
name="processed_messages_without_error",
documentation="Processed messages without error",
labelnames=["broker"],
)
self._registry.register(self._received_messages_counter)
self._registry.register(self._processed_messages_with_error_counter)
self._registry.register(self._processed_messages_without_error_counter)
@staticmethod
def get_broker_name() -> str:
raise NotImplementedError
def __call__(self, msg: Optional[Any]) -> "BaseMiddleware":
return PrometheusMiddleware(
msg=msg,
broker_name=self.get_broker_name(),
received_messages_counter=self._received_messages_counter,
processed_messages_with_error_counter=self._processed_messages_with_error_counter,
processed_messages_without_error_counter=self._processed_messages_without_error_counter,
)
class PrometheusMiddleware(BaseMiddleware):
def __init__(
self,
msg: Optional[Any] = None,
*,
broker_name: str,
received_messages_counter: "Counter",
processed_messages_with_error_counter: "Counter",
processed_messages_without_error_counter: "Counter",
) -> None:
self._broker_name = broker_name
self._received_messages_counter = received_messages_counter
self._processed_messages_with_error_counter = processed_messages_with_error_counter
self._processed_messages_without_error_counter = processed_messages_without_error_counter
super().__init__(msg)
async def on_consume(
self,
msg: "StreamMessage[Any]",
) -> "StreamMessage[Any]":
self._received_messages_counter.labels(broker=self._broker_name).inc()
return await super().on_consume(msg)
async def after_processed(self, exc_type, exc_val, exc_tb):
if exc_val is None:
self._processed_messages_without_error_counter.labels(broker=self._broker_name).inc()
else:
self._processed_messages_with_error_counter.labels(broker=self._broker_name).inc()
return await super().after_processed(exc_type, exc_val, exc_tb)
This example produces metrics like this:
# HELP received_messages_total Received messages
# TYPE received_messages_total counter
received_messages_total{broker="rabbit"} 314560.0
# HELP received_messages_created Received messages
# TYPE received_messages_created gauge
received_messages_created{broker="rabbit"} 1.723374866369715e+09
# HELP processed_messages_with_error_total Processed messages with error
# TYPE processed_messages_with_error_total counter
processed_messages_with_error_total{broker="rabbit"} 155.0
# HELP processed_messages_with_error_created Processed messages with error
# TYPE processed_messages_with_error_created gauge
processed_messages_with_error_created{broker="rabbit"} 1.723374866552031e+09
# HELP processed_messages_without_error_total Processed messages without error
# TYPE processed_messages_without_error_total counter
processed_messages_without_error_total{broker="rabbit"} 314559.0
# HELP processed_messages_without_error_created Processed messages without error
# TYPE processed_messages_without_error_created gauge
processed_messages_without_error_created{broker="rabbit"} 1.723374866372083e+09
It’s also a good idea to add a “handler” label in metrics, but I haven’t yet been able to figure out how to get meta information about the handler. Or how can I do this better by getting it from a msg object?
I would be glad to hear any comments and suggestions on this implementation.
I would be happy to implement this feature.
and what is the best type for metrics? counter or histogram?
and what is the best type for metrics? counter or histogram?
I think, it should be counter type
I'd be cool if there was also an option for publishing the metrics to a Prometheus Pushgateway. This would benefit short-lived and ephemeral jobs. It's already supported by the Prometheus Python client.
https://prometheus.github.io/client_python/exporting/pushgateway/
I'd be cool if there was also an option for publishing the metrics to a Prometheus Pushgateway. This would benefit short-lived and ephemeral jobs. It's already supported by the Prometheus Python client.
https://prometheus.github.io/client_python/exporting/pushgateway/
The main idea, that you should pass CollectorRegistry() object to FastStream middlewares to write metrics into it. Then you still able to use this registry object any way, that prometheus python client allows. So, I don't think FastStream reqires any additional options to support Pushgateway
@Lancetnik Yeah, that would work 💪
I conducted a small research and am adding information about the planned metrics:
metrics that I plan to implement:
- number of received messages (Counter);
- size of received messages (Histogram);
- duration of message processing (Histogram);
- number of messages in processing (Gauge);
- number of processed messages (Counter);
- number of errors during message processing (Counter);
- number of sent messages (Counter);
- duration of message sending (Histogram);
- size of sent messages (Histogram);
- number of errors during message sending (Counter).
each metric will have labels: broker, handler (for receiving metrics), destination (for sending metrics)
message processing metrics will also have a status label
status values for processing received messages:
- acked
- nacked
- rejected
- skipped
- error
status values for processing sent messages:
- success
- error
the error number metric will have a label error_type, its values will be type(exc).__name__