haystack
haystack copied to clipboard
Support both callback and generator-based streaming in all Chat Generators
Motivation
Currently, all chat generator components (OpenAI, Anthropic, etc.) support streaming only through callbacks:
def callback(chunk: StreamingChunk):
print(chunk.content)
generator = OpenAIChatGenerator()
result = generator.run(messages, streaming_callback=callback)
This works well for simple use cases (notebooks etc), but becomes problematic when:
-
Integrating with async frameworks like FastAPI that expect a generator for streaming responses:
@app.get("/stream") async def stream_endpoint(): return StreamingResponse(generator_function()) # Needs a generator -
Building pipelines where downstream components expect to consume a stream of tokens:
pipeline.connect("chat", "stream_processor.stream") # Currently not possible -
Implementing custom streaming logic that doesn't fit the callback pattern:
# Currently not possible: for chunk in result.stream: # Custom processing await process_chunk(chunk)
Proposed Solution
Add a second output socket stream to all chat generator components:
@component.output_types(
replies=List[ChatMessage],
stream=Generator[ChatCompletionChunk, None, None]
)
This allows components to support both streaming patterns:
-
Legacy callback-based:
result = generator.run(messages, streaming_callback=callback) # result = {"replies": [...], "stream": None} -
Generator-based (new):
result = generator.run(messages) # result = {"replies": [], "stream": <generator>} for chunk in result["stream"]: print(chunk.content)
Implementation Details
Two possible approaches:
Option 1: Use Component Socket Detection
Add helper method to detect if stream socket is connected:
def has_output_receivers(self, instance, socket_name: str) -> bool:
if not hasattr(instance, "__haystack_output__"):
return False
socket = instance.__haystack_output__.get(socket_name)
return bool(socket.receivers) if socket else False
Components would enable streaming based on either callback or socket connection:
stream_has_receivers = component.has_output_receivers(self, "stream")
is_streaming = streaming_callback is not None or stream_has_receivers
Pros:
- Clean integration with pipeline system
- No API changes needed
- Automatic streaming when socket connected
Cons:
- Less explicit control over streaming mode
- Less explicit and mysterious
Option 2: Use Sentinel Value
Add sentinel to signal generator-based streaming:
class GENERATE_STREAM:
"""Sentinel object used to signal generator-based streaming"""
pass
GENERATE = GENERATE_STREAM()
# Usage:
result = generator.run(messages, streaming_callback=GENERATE)
Pros:
- Explicit control over streaming mode
- Clear API intention
Cons:
- More complex API
Questions to Resolve
- Should we support both streaming modes simultaneously?
- How should we handle errors in streaming mode?
- Should we standardize chunk format across different LLM providers which would be nice, no?
This article by @aryaminus covers the workarounds and mental gymnastics users have to go through to enable this functionality.
@vblagoje yeah I saw it. In general I agree with proposed approach, my points:
Should we support both streaming modes simultaneously?
I think we need first to investigate which is the primary use case for streaming_callback (apart from the article above). We may decide to support both modes initially, then gently deprecate one.
How should we handle errors in streaming mode?
On SSE use case (ie open-webui), network timeout errors (the most common ones) should be handled on the client side. If one instead simply consume a generator, error should be handled while consuming it with classic try / except block.
Should we standardize chunk format across different LLM providers which would be nice, no?
I agree on this!
@vblagoje felt like dropping my thoughts:
-
Should we support both streaming modes simultaneously?
w/o overcomplicating,generator-basedmight have better inclination as it allows adding flavors, unless we want something simple and pragmatic and thuscallback-based. -
How should we handle errors in streaming mode?
propagate errors through the generator forgenerator-based/ pass errors to the callback or log them explicitly forcallback-based. -
Should we standardize chunk format across LLM providers?
not by default.
my leniency is towards Option 1 (Component Socket Detection) for seamless integration but suggest adding an optional parameter (e.g., streaming_mode="callback" | "generator") for explicit control.
Thanks @aryaminus - very good suggestion on streaming_mode="callback" | "generator") for explicit control. We are internally talking how to proceed on this. More feedback from the community is always appreciated!
@anakin87 can we get your input here as well (with preference on how to move forward) so we can start moving in this direction - with your guidance and lead @mpangrazzi of course.
Would it be possible to also have a non-streaming mode streaming_mode=None? @vblagoje
Is there any update on this? To return a StreamingResponse from a FastAPI endpoint, our current workaround directly uses the generator of AsyncOpenAI client. Ideally, we’d love to have this as part of pipelines.
Another option I stumbled upon would be to provide a custom fastAPI/starlette streaming response that passes a callback to the client code instead of requiring a Generator:
E.g. pass the send of
https://github.com/encode/starlette/blob/4acf1d1ca3e8aa767567cb4e6e12f093f066553b/starlette/responses.py#L256
to Haystack's StreamingCallback and call it directly from there.
E.g. for AsyncPipelines (normal Pipelines could be supported as well with the help of asyncer lib):
from collections.abc import Awaitable, Callable
from typing import Annotated, Any
from fastapi import Depends
from starlette.responses import Response, Send, Scope, Receive
from haystack import AsyncPipeline
from haystack.dataclasses import StreamingChunk
class CallbackStreamingResponse(Response):
def __init__(
self,
callable: Callable[[Send], Awaitable[None]],
...,
) -> None:
self.callable = callable
...
async def stream_response(self, send: Send) -> None:
await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.raw_headers,
}
)
await self.callable(send)
await send({"type": "http.response.body", "body": b"", "more_body": False})
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""same implementation as the original StreamingResponse.__call__ method"""
...
class PipelineCallable:
def __init__(self, pipeline: AsyncPipeline, data: dict[str, Any]) -> None:
self.pipeline = pipeline
self.data = data
async def __call__(self, send: Send) -> None:
async def streaming_callback(chunk: StreamingChunk) -> None:
await send({"type": "http.response.body", "body": chunk.encode(), "more_body": True})
self.data["streaming_callback"] = streaming_callback
await self.pipeline.run_async(data=self.data)
def pipeline() -> AsyncPipeline:
...
@app.post("/stream")
async def stream(
data: dict[str, Any],
pipeline: Annotated[AsyncPipeline, Depends(pipeline)],
) -> CallbackStreamingResponse:
return CallbackStreamingResponse(
PipelineCallable(
pipeline=pipeline,
data=data,
),
)
I completely support this as well and would be much appreciative of having this to not have to do so many workarounds for something that feels standard