awesome-devops
awesome-devops copied to clipboard
update?
When will this be updated?
Same issue for me. It throws ValueError where Interceptor must be ServerInterceptor.
+1
I could also really do with this
I am not really familiar with grpc.aio. Welcome any MR to get it started.
Hi guys! I resolved this problem in my project. And I want to share this with you. But first I need to fix the formatting of this repository. I created a issue #34. Help me, please @lchenn
@pyanchesko same issue for me could you share your solution? thanks
UPD: I did it My solution:
"""Interceptor a client call with prometheus"""
import logging
from timeit import default_timer
from typing import Awaitable, Callable
import grpc # type: ignore
from prometheus_client.registry import REGISTRY
from py_grpc_prometheus import grpc_utils # type: ignore
from py_grpc_prometheus import server_metrics # type: ignore
_LOGGER = logging.getLogger(__name__)
# We were forced to write this class because
# https://github.com/lchenn/py-grpc-prometheus/issues/13
# This file is an almost complete copy of py_grpc_prometheus.PromServerInterceptor
# For information:
# https://stackoverflow.com/questions/64192211/how-to-convert-grpc-serverinterceptor-to-grcp-aio-serverinterceptor
class PromAioServerInterceptor(grpc.aio.ServerInterceptor):
def __init__(
self,
enable_handling_time_histogram=False,
legacy=False,
skip_exceptions=False,
log_exceptions=True,
registry=REGISTRY
) -> None:
self._enable_handling_time_histogram = enable_handling_time_histogram
self._legacy = legacy
self._grpc_server_handled_total_counter = server_metrics.get_grpc_server_handled_counter(
self._legacy,
registry
)
self._metrics = server_metrics.init_metrics(registry)
self._skip_exceptions = skip_exceptions
self._log_exceptions = log_exceptions
# This is a constraint of current grpc.StatusCode design
# https://groups.google.com/g/grpc-io/c/EdIXjMEaOyw/m/d3DeqmrJAAAJ
self._code_to_status_mapping = {x.value[0]: x for x in grpc.StatusCode}
async def intercept_service(
self,
continuation: Callable[[grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]],
handler_call_details: grpc.HandlerCallDetails
) -> grpc.RpcMethodHandler:
"""
Intercepts the server function calls.
This implements referred to:
https://github.com/census-instrumentation/opencensus-python/blob/master/opencensus/
trace/ext/grpc/server_interceptor.py
and
https://grpc.io/grpc/python/grpc.html#service-side-interceptor
"""
grpc_service_name, grpc_method_name, _ = grpc_utils.split_method_call(handler_call_details)
def metrics_wrapper(behavior, request_streaming, response_streaming):
async def new_behavior(request_or_iterator, servicer_context):
response_or_iterator = None
try:
start = default_timer()
grpc_type = grpc_utils.get_method_type(request_streaming, response_streaming)
try:
if request_streaming:
request_or_iterator = grpc_utils.wrap_iterator_inc_counter(
request_or_iterator,
self._metrics["grpc_server_stream_msg_received"],
grpc_type,
grpc_service_name,
grpc_method_name
)
else:
self._metrics["grpc_server_started_counter"].labels(
grpc_type=grpc_type,
grpc_service=grpc_service_name,
grpc_method=grpc_method_name
).inc()
# Invoke the original rpc behavior.
response_or_iterator = await behavior(request_or_iterator, servicer_context)
if response_streaming:
sent_metric = self._metrics["grpc_server_stream_msg_sent"]
response_or_iterator = grpc_utils.wrap_iterator_inc_counter(
response_or_iterator,
sent_metric,
grpc_type,
grpc_service_name,
grpc_method_name
)
else:
self.increase_grpc_server_handled_total_counter(
grpc_type,
grpc_service_name,
grpc_method_name,
self._compute_status_code(servicer_context).name
)
return response_or_iterator
except grpc.RpcError as e:
self.increase_grpc_server_handled_total_counter(
grpc_type,
grpc_service_name,
grpc_method_name,
self._compute_error_code(e).name
)
raise e
finally:
if not response_streaming:
if self._legacy:
self._metrics["legacy_grpc_server_handled_latency_seconds"].labels(
grpc_type=grpc_type,
grpc_service=grpc_service_name,
grpc_method=grpc_method_name
).observe(max(default_timer() - start, 0))
elif self._enable_handling_time_histogram:
self._metrics["grpc_server_handled_histogram"].labels(
grpc_type=grpc_type,
grpc_service=grpc_service_name,
grpc_method=grpc_method_name
).observe(max(default_timer() - start, 0))
except Exception as e: # pylint: disable=broad-except
# Allow user to skip the exceptions in order to maintain
# the basic functionality in the server
# The logging function in exception can be toggled with log_exceptions
# in order to suppress the noise in logging
if self._skip_exceptions:
if self._log_exceptions:
_LOGGER.error(e)
if response_or_iterator is None:
return response_or_iterator
return behavior(request_or_iterator, servicer_context)
raise e
return new_behavior
handler = await continuation(handler_call_details)
optional_any = self._wrap_rpc_behavior(handler, metrics_wrapper)
return optional_any
def _compute_status_code(self, servicer_context):
if servicer_context.cancelled():
return grpc.StatusCode.CANCELLED
if servicer_context.code() is None:
return grpc.StatusCode.OK
return self._code_to_status_mapping[servicer_context.code()]
def _compute_error_code(self, grpc_exception):
if isinstance(grpc_exception, grpc.aio.Call):
return grpc_exception.code()
return grpc.StatusCode.UNKNOWN
def increase_grpc_server_handled_total_counter(
self, grpc_type, grpc_service_name, grpc_method_name, grpc_code):
if self._legacy:
self._grpc_server_handled_total_counter.labels(
grpc_type=grpc_type,
grpc_service=grpc_service_name,
grpc_method=grpc_method_name,
code=grpc_code
).inc()
else:
self._grpc_server_handled_total_counter.labels(
grpc_type=grpc_type,
grpc_service=grpc_service_name,
grpc_method=grpc_method_name,
grpc_code=grpc_code
).inc()
def _wrap_rpc_behavior(self, handler, fn):
"""Returns a new rpc handler that wraps the given function"""
if handler is None:
return None
if handler.request_streaming and handler.response_streaming:
behavior_fn = handler.stream_stream
handler_factory = grpc.stream_stream_rpc_method_handler
elif handler.request_streaming and not handler.response_streaming:
behavior_fn = handler.stream_unary
handler_factory = grpc.stream_unary_rpc_method_handler
elif not handler.request_streaming and handler.response_streaming:
behavior_fn = handler.unary_stream
handler_factory = grpc.unary_stream_rpc_method_handler
else:
behavior_fn = handler.unary_unary
handler_factory = grpc.unary_unary_rpc_method_handler
return handler_factory(
fn(behavior_fn, handler.request_streaming, handler.response_streaming),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer
)
@lchenn FYI
Hi @pyanchesko and @gggrafff ,
How did you adjusted the Client Interceptor to work with grpc.aio?
I'd be glad if you could share your solutions :-)
@nettashafir
Using of ServerInterceptor:
from prometheus_aio_server_interceptor import PromAioServerInterceptor
server = grpc.aio.server(interceptors=(PromAioServerInterceptor(enable_handling_time_histogram=True),))
actual_port = server.add_insecure_port('[::]:1234')
...
I don't use ClientInterceptor. Describe in more detail what problems do you have with this?
Thanks @gggrafff , How do I collect the metrics that way? I added your PromAioServerInterceptor, and my server run nicely. Yet, I try to add also:
prometheus_client.start_http_server(50052)
But I cannot send any GET requests for https://localhost:50052 - I get a ConnectionError
@nettashafir I have written an example in more details:
import asyncio
import grpc
from grpc_health.v1 import health, health_pb2, health_pb2_grpc
import prometheus_client
from prometheus_aio_server_interceptor import PromAioServerInterceptor
async def serve(
server: grpc.aio.Server,
) -> None:
await server.start()
await server.wait_for_termination()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
server = grpc.aio.server(interceptors=(PromAioServerInterceptor(enable_handling_time_histogram=True),))
grpc_port = server.add_insecure_port('[::]:1234')
print('grpc port: ', grpc_port)
health_servicer = health.HealthServicer()
health_servicer.set("instance_id", health_pb2.HealthCheckResponse.SERVING)
health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
metrics_port = 4321
prometheus_client.start_http_server(metrics_port)
print('metrics port: ', metrics_port)
print(f'Your metrics: http://localhost:{metrics_port}')
loop.run_until_complete(serve(server))
It works for me
I've noticed that the current implementation doesn't work so well for stream response, for in that case the RPC is an iterator.
I edited @gggrafff patch (including some design changes) and now it's working nicely for me:
"""Interceptor a client call with prometheus"""
import logging
from timeit import default_timer
from typing import Awaitable, Callable
import grpc # type: ignore
from prometheus_client.registry import REGISTRY
from py_grpc_prometheus import grpc_utils # type: ignore
from py_grpc_prometheus import server_metrics # type: ignore
logger = logging.getLogger()
LEGACY = True
SKIP_EXCEPTION = False
ENABLE_HANDLING_TIME_HISTOGRAM = True
# This is a constraint of current grpc.StatusCode design: https://groups.google.com/g/grpc-io/c/EdIXjMEaOyw/m/d3DeqmrJAAAJ
_code_to_status_mapping = {x.value[0]: x for x in grpc.StatusCode}
metrics = server_metrics.init_metrics(REGISTRY)
grpc_server_handled_total_counter = server_metrics.get_grpc_server_handled_counter(
LEGACY,
REGISTRY,
)
# ----------------------------------- Helpers
async def _wrap_async_iterator_inc_counter(iterator, counter, grpc_type, grpc_service_name, grpc_method_name):
"""Wraps an iterator and collect metrics."""
async for item in iterator:
counter.labels(
grpc_type=grpc_type,
grpc_service=grpc_service_name,
grpc_method=grpc_method_name).inc()
yield item
def _compute_error_code(grpc_exception):
if isinstance(grpc_exception, grpc.aio.Call):
return grpc_exception.code()
return grpc.StatusCode.UNKNOWN
def _compute_status_code(servicer_context):
if servicer_context.code() is None:
return grpc.StatusCode.OK
return _code_to_status_mapping[servicer_context.code()]
def _increase_grpc_server_started_counter(grpc_type, grpc_service_name, grpc_method_name):
metrics["grpc_server_started_counter"].labels(
grpc_type=grpc_type,
grpc_service=grpc_service_name,
grpc_method=grpc_method_name
).inc()
def _increase_grpc_server_handled_total_counter(grpc_type, grpc_service_name, grpc_method_name, grpc_code):
if LEGACY:
grpc_server_handled_total_counter.labels(
grpc_type=grpc_type,
grpc_service=grpc_service_name,
grpc_method=grpc_method_name,
code=grpc_code
).inc()
else:
grpc_server_handled_total_counter.labels(
grpc_type=grpc_type,
grpc_service=grpc_service_name,
grpc_method=grpc_method_name,
grpc_code=grpc_code
).inc()
def _increase_grpc_server_handled_latency(grpc_type, grpc_service_name, grpc_method_name, start):
if LEGACY:
metrics["legacy_grpc_server_handled_latency_seconds"].labels(
grpc_type=grpc_type,
grpc_service=grpc_service_name,
grpc_method=grpc_method_name
).observe(max(default_timer() - start, 0))
elif ENABLE_HANDLING_TIME_HISTOGRAM:
metrics["grpc_server_handled_histogram"].labels(
grpc_type=grpc_type,
grpc_service=grpc_service_name,
grpc_method=grpc_method_name
).observe(max(default_timer() - start, 0))
def _wrap_rpc_behavior(handler, new_behavior_factory, grpc_service_name, grpc_method_name):
"""Returns a new rpc handler that wraps the given function"""
if handler is None:
return None
if handler.request_streaming and handler.response_streaming:
orig_behavior = handler.stream_stream
handler_factory = grpc.stream_stream_rpc_method_handler
elif handler.request_streaming and not handler.response_streaming:
orig_behavior = handler.stream_unary
handler_factory = grpc.stream_unary_rpc_method_handler
elif not handler.request_streaming and handler.response_streaming:
orig_behavior = handler.unary_stream
handler_factory = grpc.unary_stream_rpc_method_handler
else:
orig_behavior = handler.unary_unary
handler_factory = grpc.unary_unary_rpc_method_handler
return handler_factory(
behavior=new_behavior_factory(orig_behavior,
handler.request_streaming,
handler.response_streaming,
grpc_service_name,
grpc_method_name),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
# ----------------------------------- metrics wrapper
def metrics_wrapper(behavior, request_streaming, response_streaming, grpc_service_name, grpc_method_name):
async def new_unary_behavior(request_or_iterator, servicer_context):
response = None
try:
start = default_timer()
grpc_type = grpc_utils.get_method_type(request_streaming, response_streaming)
try:
_increase_grpc_server_started_counter(grpc_type, grpc_service_name, grpc_method_name)
# Invoke the original rpc behavior.
response = await behavior(request_or_iterator, servicer_context)
_increase_grpc_server_handled_total_counter(
grpc_type,
grpc_service_name,
grpc_method_name,
_compute_status_code(servicer_context).name
)
return response
except grpc.RpcError as exc:
_increase_grpc_server_handled_total_counter(
grpc_type,
grpc_service_name,
grpc_method_name,
_compute_error_code(exc).name
)
raise exc
finally:
_increase_grpc_server_handled_latency(grpc_type, grpc_service_name, grpc_method_name, start)
except Exception as exc: # pylint: disable=broad-except
# Allow user to skip the exceptions in order to maintain
# the basic functionality in the server
# in order to suppress the noise in logging
logger.error(exc)
if SKIP_EXCEPTION:
response = await behavior(request_or_iterator, servicer_context)
return response
raise exc
async def new_stream_behavior(request_or_iterator, servicer_context):
iterator = None
try:
start = default_timer()
grpc_type = grpc_utils.get_method_type(request_streaming, response_streaming)
try:
_increase_grpc_server_started_counter(grpc_type, grpc_service_name, grpc_method_name)
iterator = _wrap_async_iterator_inc_counter(
behavior(request_or_iterator, servicer_context),
metrics["grpc_server_stream_msg_sent"],
grpc_type,
grpc_service_name,
grpc_method_name
)
async for obj in iterator:
yield obj
_increase_grpc_server_handled_total_counter(
grpc_type,
grpc_service_name,
grpc_method_name,
_compute_status_code(servicer_context).name
)
except grpc.RpcError as exc:
_increase_grpc_server_handled_total_counter(
grpc_type,
grpc_service_name,
grpc_method_name,
_compute_error_code(exc).name
)
raise exc
finally:
_increase_grpc_server_handled_latency(grpc_type, grpc_service_name, grpc_method_name, start)
except Exception as exc:
logger.error(exc)
if SKIP_EXCEPTION:
async for obj in behavior(request_or_iterator, servicer_context):
yield obj
raise exc
if response_streaming:
return new_stream_behavior
return new_unary_behavior
# ----------------------------------- Interceptor
class PromAioServerInterceptor(grpc.aio.ServerInterceptor):
def __init__(self):
logger.info("Initializing metric interceptor")
async def intercept_service(
self,
continuation: Callable[[grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]],
handler_call_details: grpc.HandlerCallDetails
) -> grpc.RpcMethodHandler:
"""
Intercepts the server function calls.
Only intercepts unary requests.
"""
grpc_service_name, grpc_method_name, _ = grpc_utils.split_method_call(handler_call_details)
handler = await continuation(handler_call_details)
handler = _wrap_rpc_behavior(handler, metrics_wrapper, grpc_service_name, grpc_method_name)
return handler