logfire icon indicating copy to clipboard operation
logfire copied to clipboard

Support request and response body capture in aiohttp client instrumentation

Open amardeep opened this issue 5 months ago • 10 comments

Description

httpx instrumentation supports capturing request and response body. This is missing from aiohttp client instrumentation.

Google's genai library switches to aiohttp if it finds aiohttp python package installed - for eg. due to another dependency eg. litellm. As logfire.instrument_aiohttp_client() doesn't support request/response capture, it is limited in its utility. Please add feature parity with httpx instrumentation for aiohttp too.

amardeep avatar Jul 06 '25 04:07 amardeep

It doesn't seem like Otel supports these captures as of now looking at the instrumentation implementation of AioHttpClientInstrumentor.

adtyavrdhn avatar Jul 10 '25 16:07 adtyavrdhn

The otel HTTPXClientInstrumentor doesn't really support it either, we jump through hoops like this:


    def capture_body_if_text(self, attr_name: str = 'http.response.body.text'):
        def hook(span: LogfireSpan):
            try:
                # response.text uses errors='replace' under the hood.
                # We rely on decoding errors to guess when the response is not text.
                text = self.response.content.decode(self.response.encoding or 'utf-8')
            except (UnicodeDecodeError, LookupError):
                return
            self.capture_text_as_json(span, attr_name=attr_name, text=text)

        self.on_response_read(hook)

    @cached_property
    def response(self) -> httpx.Response:
        frame = inspect.currentframe().f_back.f_back  # type: ignore
        while frame:  # pragma: no branch
            response = frame.f_locals.get('response')
            frame = frame.f_back
            if isinstance(response, httpx.Response):
                return response
        raise RuntimeError('Could not find the response object')  # pragma: no cover

    def on_response_read(self, hook: Callable[[LogfireSpan], None]):
        if self.is_async:

            async def aread(original_aread: Callable[[], Awaitable[bytes]]) -> bytes:
                with self.attach_original_span_context(), self.logfire_instance.span('Reading response body') as span:
                    content = await original_aread()
                    hook(span)
                return content

            self.wrap_response_aread(aread)
        else:

            def read(original_read: Callable[[], bytes]) -> bytes:
                with self.attach_original_span_context(), self.logfire_instance.span('Reading response body') as span:
                    content = original_read()
                    hook(span)
                return content

            self.wrap_response_read(read)

    def wrap_response_read(self, hook: Callable[[Callable[[], bytes]], bytes]):
        response = self.response
        original_read = response.read

        @functools.wraps(original_read)
        def read() -> bytes:
            try:
                # Only log the body the first time it's read
                return response.content
            except httpx.ResponseNotRead:
                return hook(original_read)

        response.read = read

    def wrap_response_aread(self, hook: Callable[[Callable[[], Awaitable[bytes]]], Awaitable[bytes]]):
        response = self.response
        original_aread = response.aread

        @functools.wraps(original_aread)
        async def aread() -> bytes:
            try:
                # Only log the body the first time it's read
                return response.content
            except httpx.ResponseNotRead:
                return await hook(original_aread)

        response.aread = aread

    @contextlib.contextmanager
    def attach_original_span_context(self):
        with use_span(NonRecordingSpan(self.span.get_span_context())):
            yield

    def capture_text_as_json(self, span: LogfireSpan, *, text: str, attr_name: str):
        span.set_attribute(attr_name, {})  # Set the JSON schema
        # Set the attribute to the raw text so that the backend can parse it
        span._span.set_attribute(attr_name, text)  # type: ignore

alexmojaki avatar Jul 10 '25 16:07 alexmojaki

This is very cool, so we could try something similar for aiohttp as well?

adtyavrdhn avatar Jul 10 '25 19:07 adtyavrdhn

Yes it might be possible, hard to check quickly though.

alexmojaki avatar Jul 10 '25 20:07 alexmojaki

Could client tracing be used for this?

https://docs.aiohttp.org/en/stable/client_advanced.html#aiohttp-client-tracing

amardeep avatar Jul 11 '25 02:07 amardeep

I looked into it and I think this is feasible. I tried a rough implementation based on the approach in httpx.

TraceRequestEndParams has the response which can be stringified and displayed, request body seems tricky because signals are sent for chunk: bytes in TraceRequestStartParams | TraceRequestChunkSentParams(haven't figured this one out yet). Headers for requests and responses can be extracted from TraceRequestStartParams, TraceRequestEndParams.

adtyavrdhn avatar Jul 15 '25 14:07 adtyavrdhn

Headers, request body, and response body are all separate features and can be separate PRs.

alexmojaki avatar Jul 15 '25 14:07 alexmojaki

I'll take a stab at it

adtyavrdhn avatar Jul 16 '25 03:07 adtyavrdhn

@alexmojaki is this still being worked on? I saw https://github.com/pydantic/logfire/pull/1131 got merged, but that only added a logfire wrapper. Features like request capture, response capture, etc. are still unimplemented. I use primarily use google genai with aiohttp, so would really appreciate the remaining features being implemented

yiphei avatar Sep 05 '25 22:09 yiphei

Response body and headers are done, not request body

alexmojaki avatar Sep 25 '25 22:09 alexmojaki