how to get http request headers in mcp tools logic when i use streamable http mode to run mcp server ?
i want to create a mcp server base on streamablehttp_stateless running mode(https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/servers/simple-streamablehttp-stateless/mcp_simple_streamablehttp_stateless/server.py example code), but i can not find any mothed to get http request msg in the tools logic. so how can i get the http request headers msg in the tools define function ?
+1 bump
I figured out this clutch fix for streamable http - I just wanted to verify some header data in my db here is just code example for authorization bearer check. you can build on top of it to parse header into meta (mcp.get_context().request_context.meta) or EventStore / store_event()
Since we cant pass middleware to FastMCP I looked at mcp.server.auth
Also take a look at this example examples/servers/simple-auth/mcp_simple_auth
from mcp.server.auth.settings import AuthSettings, ClientRegistrationOptions
from mcp.server.auth.provider import OAuthAuthorizationServerProvider, AccessToken
from pydantic import AnyHttpUrl
from mcp.server.auth.middleware.auth_context import get_access_token
# Create a simple auth provider that just checks for bearer token
class SimpleBearerAuthProvider(OAuthAuthorizationServerProvider):
async def load_access_token(self, token: str):
if token:
try:
return AccessToken(
token=token,
client_id="test-client", # For testing purposes
scopes=["api"],
expires_at=None
)
except Exception as e:
return None
return None
# Create auth settings
auth_settings = AuthSettings(
issuer_url=AnyHttpUrl("http://localhost:8000"),
client_registration_options=ClientRegistrationOptions(
enabled=False, # Disable client registration
valid_scopes=["api"],
default_scopes=["api"],
),
required_scopes=["api"],
)
# Create auth provider instance
auth_provider = SimpleBearerAuthProvider()
mcp = FastMCP(
name="MCP Server",
stateless_http=True,
auth=auth_settings,
auth_server_provider=auth_provider,
)
@mcp.tool
async def test_tool() -> JSONResponse:
access_token = get_access_token().token
params = {
"api_key": access_token,
}
etc...
Can we directly access the contents of the header, such as the Bearer token?
@viczommers How to pass the meta from the mcp client to get it in mcp.get_context().request_context.meta?
I am newbie to MCP and Agents:
Does anyone know how we can access the bearer token using this method? My use case needs to pass the access_token for another API (Google Calendar) to get access to the user data. Tools need this to fetch the information. I am trying to pass the access token through a custom class
class AuthenticatedMultiServerMCPClient(MultiServerMCPClient): async def _call_server(self, server, message, *args, **kwargs): token = current_token.get(None) if token and self._servers[server]["transport"] == "streamable_http": self._servers[server]["headers"] = { "Authorization": f"Bearer {token}" } return await super()._call_server(server, message, *args, **kwargs)
But when we call get.tools() there is no header passed which breaks the code.
You can pull the header from the MCP supplied context object if you're running a streamable-http server
from fastmcp import FastMCP, Context
mcp = FastMCP()
@mcp.tool(description="prints a header, concatenates it with an agent's input, and returns the result")
async def header_puller(context: Context, agent_supplied_arg: str) -> str:
example_header = context.request_context.request.headers.get("example-header")
print(example_header)
result = f"header: {example_header}, agent input: {agent_supplied_arg}"
return result
app = mcp.streamable_http_app()
Closing as @josiahdc's suggestion should solve the original issue. If it doesn't please feel free to re-open the issue.
You can pull the header from the MCP supplied context object if you're running a streamable-http server
from fastmcp import FastMCP, Context
mcp = FastMCP()
@mcp.tool(description="prints a header, concatenates it with an agent's input, and returns the result") async def header_puller(context: Context, agent_supplied_arg: str) -> str: example_header = context.request_context.request.headers.get("example-header") print(example_header) result = f"header: {example_header}, agent input: {agent_supplied_arg}" return result
app = mcp.streamable_http_app()
But this feature is on fastmcp package , cant we have it on the python mcp sdk itself ?
You can pull the header from the MCP supplied context object if you're running a streamable-http server
from fastmcp import FastMCP, Context
mcp = FastMCP()
@mcp.tool(description="prints a header, concatenates it with an agent's input, and returns the result") async def header_puller(context: Context, agent_supplied_arg: str) -> str: example_header = context.request_context.request.headers.get("example-header") print(example_header) result = f"header: {example_header}, agent input: {agent_supplied_arg}" return result
app = mcp.streamable_http_app()
in fast mcp they have the clearer flow to fetch the headers via
from fastmcp import FastMCP
from fastmcp.server.dependencies import get_http_headers
mcp = FastMCP(name="Headers Demo")
@mcp.tool
async def safe_header_info() -> dict:
"""Safely get header information without raising errors."""
# Get headers (returns empty dict if no request context)
headers = get_http_headers()
# Get authorization header
auth_header = headers.get("authorization", "")
is_bearer = auth_header.startswith("Bearer ")
return {
"user_agent": headers.get("user-agent", "Unknown"),
"content_type": headers.get("content-type", "Unknown"),
"has_auth": bool(auth_header),
"auth_type": "Bearer" if is_bearer else "Other" if auth_header else "None",
"headers_count": len(headers)
}