Issue when sending langchain astream_events via EventSourceResponse
When sending the output of langchain's astream_events via EventSource like in the following example:
@chat_router.post("/astream_events")
async def astream_events(
request: Request,
body: ChatInvokeInputModel,
graph: Annotated[CompiledGraph, Depends(get_compiled_graph)],
) -> EventSourceResponse:
"""Return SSE for chat."""
session: SessionHeaderModel = request.state.session
config = {"configurable": {"thread_id": body.thread_id, "user": session.email}}
return EventSourceResponse(
graph.astream_events(
{"messages": [HumanMessage(content=body.message)]},
RunnableConfig(**config),
)
)
I get the following error:
File "/usr/local/lib/python3.11/site-packages/sse_starlette/event.py", line 68, in ensure_bytes
return ServerSentEvent(**data).encode()
^^^^^^^^^^^^^^^^^^^^^^^
TypeError: ServerSentEvent.__init__() got an unexpected keyword argument 'name'
The reason is that the events generated by a LangChain runnable send not only event, and data, but also tags, run_id, etc. When this is passed to ServerSentEvent via **data, it shows an unexpected keyword argument (name).
The proposal to solve this (allowing us to use EventSourceResponse) can come in two flavors:
- In the class
ServerSentEventhttps://github.com/sysid/sse-starlette/blob/f2e4d091c5d3a5216207256109f9b231e8421bd9/sse_starlette/event.py#L14-L22 we allow**kwargsso that any additional kwargs are accepted but not used. - As part of
EventSourceResponseinitialization https://github.com/sysid/sse-starlette/blob/f2e4d091c5d3a5216207256109f9b231e8421bd9/sse_starlette/sse.py#L73-L87, we accept a custom ensure_bytes function, allowing the user to specify their own "encoding" function.
Reference:
- https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable.astream_events
I don't think this should be handled by sse-starlette, but there's a number of ways one could implement this for themselves. My reasoning is that on the client, additional fields are simply ignored, but most server implementations don't allow an event to be instantiated with additional/invalid fields to begin with.
I quickly took a look at how commonly-used servers in other languages implement this, and I have yet to find an implementation that filters or ignores additional fields; all throw an error. Given that this seems to be consistent behavior across the board, I think allowing extra **kwargs here is likely to cause debugging problems by silently failing. Someone passing invalid keys is more likely making a mistake than doing so deliberately and expecting the system to fix it for them.
Here's two ways you could implement it. Subclassing EventSourceResponse (args, etc. omitted for brevity):
class FilteredEventSourceResponse(EventSourceResponse):
"""Custom EventSourceResponse that filters out unsupported event fields."""
def __init__(self, generator: AsyncIterable[Mapping[str, Any]], *args, **kwargs):
async def filtered_generator():
async for event in generator:
allowed_keys = {"event", "data", "id", "retry"}
filtered_event = {
k: v
for k, v in event.items()
if k in allowed_keys and v is not None
}
yield filtered_event
super().__init__(
filtered_generator(),
*args,
**kwargs,
)
Or with a function used inside your route:
async def filtered_sse(
event_source: AsyncIterable[Mapping[str, Any]],
) -> AsyncGenerator[Mapping[str, Any], None]:
allowed_keys = {"event", "data", "id", "retry"}
async for event in event_source:
yield {k: v for k, v in event.items() if k in allowed_keys and v is not None}
@chat_router.post("/astream_events")
async def astream_events(
request: Request,
body: ChatInvokeInputModel,
graph: Annotated[CompiledGraph, Depends(get_compiled_graph)],
) -> EventSourceResponse:
"""Return SSE for chat."""
session: SessionHeaderModel = request.state.session
config = {"configurable": {"thread_id": body.thread_id, "user": session.email}}
events = graph.astream_events(
{"messages": [HumanMessage(content=body.message)]},
RunnableConfig(**config),
)
return EventSourceResponse(filtered_sse(events))
Closed due to lack of activity.