haystack icon indicating copy to clipboard operation
haystack copied to clipboard

Support both callback and generator-based streaming in all Chat Generators

Open vblagoje opened this issue 10 months ago • 8 comments

Motivation

Currently, all chat generator components (OpenAI, Anthropic, etc.) support streaming only through callbacks:

def callback(chunk: StreamingChunk):
    print(chunk.content)

generator = OpenAIChatGenerator()
result = generator.run(messages, streaming_callback=callback)

This works well for simple use cases (notebooks etc), but becomes problematic when:

  1. Integrating with async frameworks like FastAPI that expect a generator for streaming responses:

    @app.get("/stream")
    async def stream_endpoint():
        return StreamingResponse(generator_function())  # Needs a generator
    
  2. Building pipelines where downstream components expect to consume a stream of tokens:

    pipeline.connect("chat", "stream_processor.stream")  # Currently not possible
    
  3. Implementing custom streaming logic that doesn't fit the callback pattern:

    # Currently not possible:
    for chunk in result.stream:
        # Custom processing
        await process_chunk(chunk)
    

Proposed Solution

Add a second output socket stream to all chat generator components:

@component.output_types(
    replies=List[ChatMessage],
    stream=Generator[ChatCompletionChunk, None, None]
)

This allows components to support both streaming patterns:

  1. Legacy callback-based:

    result = generator.run(messages, streaming_callback=callback)
    # result = {"replies": [...], "stream": None}
    
  2. Generator-based (new):

    result = generator.run(messages)
    # result = {"replies": [], "stream": <generator>}
    for chunk in result["stream"]:
        print(chunk.content)
    

Implementation Details

Two possible approaches:

Option 1: Use Component Socket Detection

Add helper method to detect if stream socket is connected:

def has_output_receivers(self, instance, socket_name: str) -> bool:
    if not hasattr(instance, "__haystack_output__"):
        return False
    socket = instance.__haystack_output__.get(socket_name)
    return bool(socket.receivers) if socket else False

Components would enable streaming based on either callback or socket connection:

stream_has_receivers = component.has_output_receivers(self, "stream")
is_streaming = streaming_callback is not None or stream_has_receivers

Pros:

  • Clean integration with pipeline system
  • No API changes needed
  • Automatic streaming when socket connected

Cons:

  • Less explicit control over streaming mode
  • Less explicit and mysterious

Option 2: Use Sentinel Value

Add sentinel to signal generator-based streaming:

class GENERATE_STREAM:
    """Sentinel object used to signal generator-based streaming"""
    pass

GENERATE = GENERATE_STREAM()

# Usage:
result = generator.run(messages, streaming_callback=GENERATE)

Pros:

  • Explicit control over streaming mode
  • Clear API intention

Cons:

  • More complex API

Questions to Resolve

  1. Should we support both streaming modes simultaneously?
  2. How should we handle errors in streaming mode?
  3. Should we standardize chunk format across different LLM providers which would be nice, no?

vblagoje avatar Jan 17 '25 11:01 vblagoje

This article by @aryaminus covers the workarounds and mental gymnastics users have to go through to enable this functionality.

vblagoje avatar Jan 20 '25 08:01 vblagoje

@vblagoje yeah I saw it. In general I agree with proposed approach, my points:

Should we support both streaming modes simultaneously?

I think we need first to investigate which is the primary use case for streaming_callback (apart from the article above). We may decide to support both modes initially, then gently deprecate one.

How should we handle errors in streaming mode?

On SSE use case (ie open-webui), network timeout errors (the most common ones) should be handled on the client side. If one instead simply consume a generator, error should be handled while consuming it with classic try / except block.

Should we standardize chunk format across different LLM providers which would be nice, no?

I agree on this!

mpangrazzi avatar Jan 20 '25 09:01 mpangrazzi

@vblagoje felt like dropping my thoughts:

  1. Should we support both streaming modes simultaneously?
    w/o overcomplicating, generator-based might have better inclination as it allows adding flavors, unless we want something simple and pragmatic and thus callback-based.

  2. How should we handle errors in streaming mode?
    propagate errors through the generator for generator-based / pass errors to the callback or log them explicitly for callback-based.

  3. Should we standardize chunk format across LLM providers?
    not by default.


my leniency is towards Option 1 (Component Socket Detection) for seamless integration but suggest adding an optional parameter (e.g., streaming_mode="callback" | "generator") for explicit control.

aryaminus avatar Jan 20 '25 14:01 aryaminus

Thanks @aryaminus - very good suggestion on streaming_mode="callback" | "generator") for explicit control. We are internally talking how to proceed on this. More feedback from the community is always appreciated!

vblagoje avatar Jan 27 '25 09:01 vblagoje

@anakin87 can we get your input here as well (with preference on how to move forward) so we can start moving in this direction - with your guidance and lead @mpangrazzi of course.

vblagoje avatar Feb 19 '25 10:02 vblagoje

Would it be possible to also have a non-streaming mode streaming_mode=None? @vblagoje

deep-rloebbert avatar Feb 19 '25 14:02 deep-rloebbert

Is there any update on this? To return a StreamingResponse from a FastAPI endpoint, our current workaround directly uses the generator of AsyncOpenAI client. Ideally, we’d love to have this as part of pipelines.

jantrienes avatar Apr 15 '25 06:04 jantrienes

Another option I stumbled upon would be to provide a custom fastAPI/starlette streaming response that passes a callback to the client code instead of requiring a Generator:

E.g. pass the send of https://github.com/encode/starlette/blob/4acf1d1ca3e8aa767567cb4e6e12f093f066553b/starlette/responses.py#L256 to Haystack's StreamingCallback and call it directly from there.

E.g. for AsyncPipelines (normal Pipelines could be supported as well with the help of asyncer lib):

from collections.abc import Awaitable, Callable
from typing import Annotated, Any
from fastapi import Depends
from starlette.responses import Response, Send, Scope, Receive
from haystack import AsyncPipeline
from haystack.dataclasses import StreamingChunk


class CallbackStreamingResponse(Response):
    def __init__(
        self,
        callable: Callable[[Send], Awaitable[None]],
        ...,
    ) -> None:
        self.callable = callable
        ...

    async def stream_response(self, send: Send) -> None:
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self.raw_headers,
            }
        )
        await self.callable(send)
        await send({"type": "http.response.body", "body": b"", "more_body": False})

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        """same implementation as the original StreamingResponse.__call__ method"""
        ...
    
    
class PipelineCallable:
    def __init__(self, pipeline: AsyncPipeline, data: dict[str, Any]) -> None:
        self.pipeline = pipeline
        self.data = data

    async def __call__(self, send: Send) -> None:
        async def streaming_callback(chunk: StreamingChunk) -> None:
            await send({"type": "http.response.body", "body": chunk.encode(), "more_body": True})
        self.data["streaming_callback"] = streaming_callback
        await self.pipeline.run_async(data=self.data)
        
        
def pipeline() -> AsyncPipeline:
    ...
        
        
@app.post("/stream")
async def stream(
    data: dict[str, Any],
    pipeline: Annotated[AsyncPipeline, Depends(pipeline)],
) -> CallbackStreamingResponse:
    return CallbackStreamingResponse(
        PipelineCallable(
            pipeline=pipeline,
            data=data,
        ),
    )

tstadel avatar May 06 '25 09:05 tstadel

I completely support this as well and would be much appreciative of having this to not have to do so many workarounds for something that feels standard

tripflex avatar Oct 09 '25 21:10 tripflex