logfire icon indicating copy to clipboard operation
logfire copied to clipboard

Distributed traces with Pydantic-AI and Streamable HTTP MCP servers not correlated properly

Open jgpruitt opened this issue 3 months ago • 1 comments

Description

I am using Logfire with Pydantic-AI and Streamable HTTP MCP servers.

I have auto-instrumentation turned on for Pydantic-AI, MCP, and HTTPX.

logfire.instrument_pydantic_ai()
logfire.instrument_mcp()
logfire.instrument_httpx()

When my agent runs and invokes a tool via MCP, I see the MCP request nested under the "agent run" span.

Image

I also see the POST from the agent service to the MCP server with its internal spans.

Image

However, I would expect the spans for the MCP server to be nested under the MCP request: tools/call get_recent_conversations_in_channel span, and they are not currently correlated.

span: MCP request: tools/call get_recent_conversations_in_channel trace_id: 0199aaac2be84786393ff770859de3c3 span_id: fd0aa8c8a61cc2f4 parent_span_id: 1c1d083ac51b2071

span: POST tiger-slack-mcp-server/mcp trace_id: 0199aaac2be84786393ff770859de3c3 span_id: 6bd2de03f4094614 parent_span_id: 29a46c34de05d09a

It appears that the auto-instrumentation for httpx is using the span of the function in which agent.run(...) was called, rather than the span of the MCP tool call for its parent_span_id.

Python, Logfire & OS Versions, related packages (not required)

logfire="4.10.0"
platform="macOS-15.6.1-arm64-arm-64bit-Mach-O"
python="3.13.2 (v3.13.2:4f8bb3947cf, Feb  4 2025, 11:51:10) [Clang 15.0.0 (clang-1500.3.9.4)]"
[related_packages]
requests="2.32.5"
pydantic="2.11.7"
openai="1.109.1"
protobuf="5.29.5"
rich="14.1.0"
executing="2.2.0"
opentelemetry-api="1.36.0"
opentelemetry-exporter-otlp-proto-common="1.36.0"
opentelemetry-exporter-otlp-proto-http="1.36.0"
opentelemetry-instrumentation="0.57b0"
opentelemetry-instrumentation-dbapi="0.57b0"
opentelemetry-instrumentation-httpx="0.57b0"
opentelemetry-instrumentation-psycopg="0.57b0"
opentelemetry-instrumentation-system-metrics="0.57b0"
opentelemetry-proto="1.36.0"
opentelemetry-sdk="1.36.0"
opentelemetry-semantic-conventions="0.57b0"
opentelemetry-util-http="0.57b0"

jgpruitt avatar Oct 03 '25 15:10 jgpruitt

This is a problem with the Python MCP SDK.

First, for a full demo, copying the scripts from https://logfire.pydantic.dev/docs/integrations/llms/mcp/ and adding instrumentation for httpx and starlette:

# server.py

from opentelemetry.instrumentation.starlette import StarletteInstrumentor

StarletteInstrumentor().instrument()

from mcp.server.fastmcp import FastMCP

import logfire

logfire.configure(service_name='server')
logfire.instrument_mcp()

app = FastMCP()


@app.tool()
def add(a: int, b: int) -> int:
    logfire.info(f'Calculating {a} + {b}')
    return a + b


app.run(transport='streamable-http')
# client.py

from pydantic_ai import Agent
from pydantic_ai.mcp import MCPServerStreamableHTTP

import logfire

logfire.configure(service_name='agent')
logfire.instrument_pydantic_ai()
logfire.instrument_mcp()
logfire.instrument_httpx(capture_all=True)

server = MCPServerStreamableHTTP('http://localhost:8000/mcp')
agent = Agent('openai:gpt-4o', toolsets=[server])
result = agent.run_sync('What is 7 plus 5?')
print(result.output)

The problem is the weird architecture of the MCP SDK using anyio memory streams instead of plain callback functions. There's no straightforward chain of function calls from calling a tool to sending an HTTP request. Instead, messages are placed on a stream, and then a separate task loops over that stream to get messages to send over HTTP: https://github.com/modelcontextprotocol/python-sdk/blob/7d335efd82b2bdc80158f6538b9532763b3c2e4e/src/mcp/client/streamable_http.py#L378

This means that the ContextVar which tracks the opentelemetry context like the current span is naturally lost. Here's another case of someone running into the same problem and explaining it a bit more: https://github.com/modelcontextprotocol/python-sdk/issues/421#issuecomment-2785062368

Logfire puts traceparent in the request params _meta which is current unofficial solution that the community is converging on in https://github.com/modelcontextprotocol/modelcontextprotocol/issues/246#issuecomment-2829508032

That's how it supports distributed tracing if logfire.instrument_mcp is used on both sides, and I suggest recommending to whatever SDK you're using that they do this too.

I've managed to patch anyio streams to carry context which mostly fixes this. Try copying the code below to the start of the client script. But it's hacky and I'm not sure how safe/reliable it is:

from __future__ import annotations

import contextvars
from collections.abc import AsyncIterable
from dataclasses import dataclass
from typing import Generic, TypeVar, Any

import anyio
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream, T_Item
from opentelemetry import context as otel_context
from opentelemetry.trace import INVALID_SPAN, Span, get_current_span, set_span_in_context

T = TypeVar('T')


@dataclass
class WithContext(Generic[T]):
    data: T
    span: Span
    token: contextvars.Token[otel_context.Context] | None = None

    @classmethod
    def create(cls, data: T):
        return cls(data, get_current_span())

    def attach(self):
        if self.span is INVALID_SPAN:
            return

        self.token = otel_context.attach(set_span_in_context(self.span))

    def detach(self):
        if self.token:
            otel_context.detach(self.token)


@dataclass
class InstrumentedSendStream(Generic[T]):
    stream: MemoryObjectSendStream[WithContext[T]]

    def send(self, item: T):
        return self.stream.send(WithContext[T].create(item))

    def send_nowait(self, item: T):
        return self.stream.send_nowait(WithContext[T].create(item))

    def __getattr__(self, item: str):
        return getattr(self.stream, item)

    async def __aenter__(self):
        return self

    async def __aexit__(self, *_: object):
        await self.aclose()

    def __enter__(self):
        return self

    def __exit__(self, *_: object):
        self.close()


@dataclass
class InstrumentedReceiveStream(AsyncIterable[T]):
    stream: MemoryObjectReceiveStream[WithContext[T]]
    current: WithContext[T] | None = None

    async def receive(self):
        return (await self.stream.receive()).data

    def receive_nowait(self):
        return self.stream.receive_nowait().data

    def __aiter__(self):
        return self

    async def __anext__(self) -> T:
        self._detach()
        self.current = await self.stream.__anext__()
        self.current.attach()
        return self.current.data

    async def __aenter__(self):
        return self

    async def __aexit__(self, *_: object):
        await self.aclose()

    def __enter__(self):
        return self

    def __exit__(self, *_: object):
        self.close()

    def close(self):
        self._detach()
        self.stream.close()

    async def aclose(self):
        self.close()

    def _detach(self):
        if self.current:
            self.current.detach()
            self.current = None

    def __getattr__(self, item: str):
        return getattr(self.stream, item)


def instrument_create_memory_object_stream():
    original = anyio.create_memory_object_stream

    class wrapper(original[WithContext[T_Item]]):
        def __new__(cls, *args: Any, **kwargs: Any):
            send, recv = super().__new__(cls, *args, **kwargs)
            return InstrumentedSendStream(send), InstrumentedReceiveStream(recv)

    anyio.create_memory_object_stream = wrapper


instrument_create_memory_object_stream()

alexmojaki avatar Oct 06 '25 14:10 alexmojaki

Is anyio working as expected and the issue is in the MCP sdk, or do anyio streams not properly propagate python's contextvars.Context?

aabmass avatar Dec 08 '25 21:12 aabmass

The problem is the weird architecture of the MCP SDK using anyio memory streams instead of plain callback functions. There's no straightforward chain of function calls from calling a tool to sending an HTTP request. Instead, messages are placed on a stream, and then a separate task loops over that stream to get messages to send over HTTP: https://github.com/modelcontextprotocol/python-sdk/blob/7d335efd82b2bdc80158f6538b9532763b3c2e4e/src/mcp/client/streamable_http.py#L378

This means that the ContextVar which tracks the opentelemetry context like the current span is naturally lost. Here's another case of someone running into the same problem and explaining it a bit more: https://github.com/modelcontextprotocol/python-sdk/issues/421#issuecomment-2785062368

anyio streams are very low level generic abstractions that don't support a concept of 'starting/stopping processing an item retrieved from the stream' which means there's no natural way to propagate context. The hacky code given has to assume a simple usage pattern.

alexmojaki avatar Dec 09 '25 12:12 alexmojaki