sse-starlette icon indicating copy to clipboard operation
sse-starlette copied to clipboard

Issue when sending langchain astream_events via EventSourceResponse

Open userlerueda opened this issue 10 months ago • 1 comments

When sending the output of langchain's astream_events via EventSource like in the following example:

@chat_router.post("/astream_events")
async def astream_events(
    request: Request,
    body: ChatInvokeInputModel,
    graph: Annotated[CompiledGraph, Depends(get_compiled_graph)],
) -> EventSourceResponse:
    """Return SSE for chat."""
    session: SessionHeaderModel = request.state.session
    config = {"configurable": {"thread_id": body.thread_id, "user": session.email}}
    return EventSourceResponse(
        graph.astream_events(
            {"messages": [HumanMessage(content=body.message)]},
            RunnableConfig(**config),
        )
    )

I get the following error:

   File "/usr/local/lib/python3.11/site-packages/sse_starlette/event.py", line 68, in ensure_bytes
     return ServerSentEvent(**data).encode()
            ^^^^^^^^^^^^^^^^^^^^^^^
 TypeError: ServerSentEvent.__init__() got an unexpected keyword argument 'name'

The reason is that the events generated by a LangChain runnable send not only event, and data, but also tags, run_id, etc. When this is passed to ServerSentEvent via **data, it shows an unexpected keyword argument (name).

The proposal to solve this (allowing us to use EventSourceResponse) can come in two flavors:

  1. In the class ServerSentEvent https://github.com/sysid/sse-starlette/blob/f2e4d091c5d3a5216207256109f9b231e8421bd9/sse_starlette/event.py#L14-L22 we allow **kwargs so that any additional kwargs are accepted but not used.
  2. As part of EventSourceResponse initialization https://github.com/sysid/sse-starlette/blob/f2e4d091c5d3a5216207256109f9b231e8421bd9/sse_starlette/sse.py#L73-L87, we accept a custom ensure_bytes function, allowing the user to specify their own "encoding" function.

Reference:

  • https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable.astream_events

userlerueda avatar Feb 27 '25 20:02 userlerueda

I don't think this should be handled by sse-starlette, but there's a number of ways one could implement this for themselves. My reasoning is that on the client, additional fields are simply ignored, but most server implementations don't allow an event to be instantiated with additional/invalid fields to begin with.

I quickly took a look at how commonly-used servers in other languages implement this, and I have yet to find an implementation that filters or ignores additional fields; all throw an error. Given that this seems to be consistent behavior across the board, I think allowing extra **kwargs here is likely to cause debugging problems by silently failing. Someone passing invalid keys is more likely making a mistake than doing so deliberately and expecting the system to fix it for them.

Here's two ways you could implement it. Subclassing EventSourceResponse (args, etc. omitted for brevity):

class FilteredEventSourceResponse(EventSourceResponse):
    """Custom EventSourceResponse that filters out unsupported event fields."""

    def __init__(self, generator: AsyncIterable[Mapping[str, Any]], *args, **kwargs):
        async def filtered_generator():
            async for event in generator:
                allowed_keys = {"event", "data", "id", "retry"}
                filtered_event = {
                    k: v
                    for k, v in event.items()
                    if k in allowed_keys and v is not None
                }
                yield filtered_event

        super().__init__(
            filtered_generator(),
            *args,
            **kwargs,
        )

Or with a function used inside your route:

async def filtered_sse(
    event_source: AsyncIterable[Mapping[str, Any]],
) -> AsyncGenerator[Mapping[str, Any], None]:
    allowed_keys = {"event", "data", "id", "retry"}

    async for event in event_source:
        yield {k: v for k, v in event.items() if k in allowed_keys and v is not None}


@chat_router.post("/astream_events")
async def astream_events(
    request: Request,
    body: ChatInvokeInputModel,
    graph: Annotated[CompiledGraph, Depends(get_compiled_graph)],
) -> EventSourceResponse:
    """Return SSE for chat."""
    session: SessionHeaderModel = request.state.session
    config = {"configurable": {"thread_id": body.thread_id, "user": session.email}}

    events = graph.astream_events(
        {"messages": [HumanMessage(content=body.message)]},
        RunnableConfig(**config),
    )

    return EventSourceResponse(filtered_sse(events))

notdaniel avatar Mar 03 '25 05:03 notdaniel

Closed due to lack of activity.

sysid avatar Jul 27 '25 08:07 sysid