Distributed traces with Pydantic-AI and Streamable HTTP MCP servers not correlated properly
Description
I am using Logfire with Pydantic-AI and Streamable HTTP MCP servers.
I have auto-instrumentation turned on for Pydantic-AI, MCP, and HTTPX.
logfire.instrument_pydantic_ai()
logfire.instrument_mcp()
logfire.instrument_httpx()
When my agent runs and invokes a tool via MCP, I see the MCP request nested under the "agent run" span.
I also see the POST from the agent service to the MCP server with its internal spans.
However, I would expect the spans for the MCP server to be nested under the MCP request: tools/call get_recent_conversations_in_channel span, and they are not currently correlated.
span: MCP request: tools/call get_recent_conversations_in_channel
trace_id: 0199aaac2be84786393ff770859de3c3
span_id: fd0aa8c8a61cc2f4
parent_span_id: 1c1d083ac51b2071
span: POST tiger-slack-mcp-server/mcp
trace_id: 0199aaac2be84786393ff770859de3c3
span_id: 6bd2de03f4094614
parent_span_id: 29a46c34de05d09a
It appears that the auto-instrumentation for httpx is using the span of the function in which agent.run(...) was called, rather than the span of the MCP tool call for its parent_span_id.
Python, Logfire & OS Versions, related packages (not required)
logfire="4.10.0"
platform="macOS-15.6.1-arm64-arm-64bit-Mach-O"
python="3.13.2 (v3.13.2:4f8bb3947cf, Feb 4 2025, 11:51:10) [Clang 15.0.0 (clang-1500.3.9.4)]"
[related_packages]
requests="2.32.5"
pydantic="2.11.7"
openai="1.109.1"
protobuf="5.29.5"
rich="14.1.0"
executing="2.2.0"
opentelemetry-api="1.36.0"
opentelemetry-exporter-otlp-proto-common="1.36.0"
opentelemetry-exporter-otlp-proto-http="1.36.0"
opentelemetry-instrumentation="0.57b0"
opentelemetry-instrumentation-dbapi="0.57b0"
opentelemetry-instrumentation-httpx="0.57b0"
opentelemetry-instrumentation-psycopg="0.57b0"
opentelemetry-instrumentation-system-metrics="0.57b0"
opentelemetry-proto="1.36.0"
opentelemetry-sdk="1.36.0"
opentelemetry-semantic-conventions="0.57b0"
opentelemetry-util-http="0.57b0"
This is a problem with the Python MCP SDK.
First, for a full demo, copying the scripts from https://logfire.pydantic.dev/docs/integrations/llms/mcp/ and adding instrumentation for httpx and starlette:
# server.py
from opentelemetry.instrumentation.starlette import StarletteInstrumentor
StarletteInstrumentor().instrument()
from mcp.server.fastmcp import FastMCP
import logfire
logfire.configure(service_name='server')
logfire.instrument_mcp()
app = FastMCP()
@app.tool()
def add(a: int, b: int) -> int:
logfire.info(f'Calculating {a} + {b}')
return a + b
app.run(transport='streamable-http')
# client.py
from pydantic_ai import Agent
from pydantic_ai.mcp import MCPServerStreamableHTTP
import logfire
logfire.configure(service_name='agent')
logfire.instrument_pydantic_ai()
logfire.instrument_mcp()
logfire.instrument_httpx(capture_all=True)
server = MCPServerStreamableHTTP('http://localhost:8000/mcp')
agent = Agent('openai:gpt-4o', toolsets=[server])
result = agent.run_sync('What is 7 plus 5?')
print(result.output)
The problem is the weird architecture of the MCP SDK using anyio memory streams instead of plain callback functions. There's no straightforward chain of function calls from calling a tool to sending an HTTP request. Instead, messages are placed on a stream, and then a separate task loops over that stream to get messages to send over HTTP: https://github.com/modelcontextprotocol/python-sdk/blob/7d335efd82b2bdc80158f6538b9532763b3c2e4e/src/mcp/client/streamable_http.py#L378
This means that the ContextVar which tracks the opentelemetry context like the current span is naturally lost. Here's another case of someone running into the same problem and explaining it a bit more: https://github.com/modelcontextprotocol/python-sdk/issues/421#issuecomment-2785062368
Logfire puts traceparent in the request params _meta which is current unofficial solution that the community is converging on in https://github.com/modelcontextprotocol/modelcontextprotocol/issues/246#issuecomment-2829508032
That's how it supports distributed tracing if logfire.instrument_mcp is used on both sides, and I suggest recommending to whatever SDK you're using that they do this too.
I've managed to patch anyio streams to carry context which mostly fixes this. Try copying the code below to the start of the client script. But it's hacky and I'm not sure how safe/reliable it is:
from __future__ import annotations
import contextvars
from collections.abc import AsyncIterable
from dataclasses import dataclass
from typing import Generic, TypeVar, Any
import anyio
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream, T_Item
from opentelemetry import context as otel_context
from opentelemetry.trace import INVALID_SPAN, Span, get_current_span, set_span_in_context
T = TypeVar('T')
@dataclass
class WithContext(Generic[T]):
data: T
span: Span
token: contextvars.Token[otel_context.Context] | None = None
@classmethod
def create(cls, data: T):
return cls(data, get_current_span())
def attach(self):
if self.span is INVALID_SPAN:
return
self.token = otel_context.attach(set_span_in_context(self.span))
def detach(self):
if self.token:
otel_context.detach(self.token)
@dataclass
class InstrumentedSendStream(Generic[T]):
stream: MemoryObjectSendStream[WithContext[T]]
def send(self, item: T):
return self.stream.send(WithContext[T].create(item))
def send_nowait(self, item: T):
return self.stream.send_nowait(WithContext[T].create(item))
def __getattr__(self, item: str):
return getattr(self.stream, item)
async def __aenter__(self):
return self
async def __aexit__(self, *_: object):
await self.aclose()
def __enter__(self):
return self
def __exit__(self, *_: object):
self.close()
@dataclass
class InstrumentedReceiveStream(AsyncIterable[T]):
stream: MemoryObjectReceiveStream[WithContext[T]]
current: WithContext[T] | None = None
async def receive(self):
return (await self.stream.receive()).data
def receive_nowait(self):
return self.stream.receive_nowait().data
def __aiter__(self):
return self
async def __anext__(self) -> T:
self._detach()
self.current = await self.stream.__anext__()
self.current.attach()
return self.current.data
async def __aenter__(self):
return self
async def __aexit__(self, *_: object):
await self.aclose()
def __enter__(self):
return self
def __exit__(self, *_: object):
self.close()
def close(self):
self._detach()
self.stream.close()
async def aclose(self):
self.close()
def _detach(self):
if self.current:
self.current.detach()
self.current = None
def __getattr__(self, item: str):
return getattr(self.stream, item)
def instrument_create_memory_object_stream():
original = anyio.create_memory_object_stream
class wrapper(original[WithContext[T_Item]]):
def __new__(cls, *args: Any, **kwargs: Any):
send, recv = super().__new__(cls, *args, **kwargs)
return InstrumentedSendStream(send), InstrumentedReceiveStream(recv)
anyio.create_memory_object_stream = wrapper
instrument_create_memory_object_stream()
Is anyio working as expected and the issue is in the MCP sdk, or do anyio streams not properly propagate python's contextvars.Context?
The problem is the weird architecture of the MCP SDK using anyio memory streams instead of plain callback functions. There's no straightforward chain of function calls from calling a tool to sending an HTTP request. Instead, messages are placed on a stream, and then a separate task loops over that stream to get messages to send over HTTP: https://github.com/modelcontextprotocol/python-sdk/blob/7d335efd82b2bdc80158f6538b9532763b3c2e4e/src/mcp/client/streamable_http.py#L378
This means that the ContextVar which tracks the opentelemetry context like the current span is naturally lost. Here's another case of someone running into the same problem and explaining it a bit more: https://github.com/modelcontextprotocol/python-sdk/issues/421#issuecomment-2785062368
anyio streams are very low level generic abstractions that don't support a concept of 'starting/stopping processing an item retrieved from the stream' which means there's no natural way to propagate context. The hacky code given has to assume a simple usage pattern.