haystack icon indicating copy to clipboard operation
haystack copied to clipboard

Consider providing a class-based streaming callback to be passed at runtime

Open sjrl opened this issue 6 months ago • 1 comments

Is your feature request related to a problem? Please describe. While working on the parent issue https://github.com/deepset-ai/haystack/issues/9347 it's becoming evident that it's useful for the streaming callback function to have state. For example in the parent issue example, we see how keeping track of the _openai_tool_call_index allows us to be able to properly end the mark down formatting of an OpenAI Tool Call.

Here is the example code

    def _render_openai_tool_call(self, chunk_received: StreamingChunk) -> StreamingChunk:
        tool_calls = chunk_received.meta.get("tool_calls") or []
        for tool_call in tool_calls:
            if not tool_call.function:
                continue
            # mutliple tool calls (distinguished by index) can be concatenated without finish_reason in between
            if self._openai_tool_call_index < tool_call.index:
                chunk_received.content += self.TOOL_END
            self._openai_tool_call_index = tool_call.index
            if tool_name := tool_call.function.name:
                chunk_received.content += self.TOOL_START.format(tool_name=tool_name)
            if arguments := tool_call.function.arguments:
                chunk_received.content += arguments
        if chunk_received.meta.get("finish_reason") == "tool_calls":
            chunk_received.content += self.TOOL_END
        return chunk_received

From working with the OpenAI SDK and their ChatCompletionChunks I have not found good way to indicate that a ToolCall is finished other than by using state. finish_reason is not good enough because using finish_reason would only be enough if there is a single tool call requested. If there are two tool calls in a single LLM response then the finish_reason is only provided after the second tool call is finished.

Describe the solution you'd like I'd like to provide as a utility function a class based streaming callback like in the parent issue. So something like

class StreamingCallback:
    def __init__(self):
        self._index = 0

    def __call__(self, chunk: StreamingChunk):
        ...

which then should only be passed at runtime like

result2 = agent.run(
    [ChatMessage.from_user("What's the weather in Berlin and Paris?")], streaming_callback=StreamingCallback()
)

This should only be passed at runtime because we need a new instance of StreamingCallback everytime a component or pipeline is run otherwise sharing the internal state (self._index) will conflict between different runs.

Describe alternatives you've considered Another alternative we could consider is allowing for a streaming_callback_factory as an init parameter. Where we would take in a function that when called returns a streaming_callback function. Then we could call this factory every time the component is run to ensure that the state is independent between runs.

Additional context This topic is exploratory so should be investigated before proceeding with any changes.

sjrl avatar Jun 02 '25 07:06 sjrl

For some additional context it's true that we need to create a new instance of a streaming_callback for every Pipeline.run done in a deployment setting.

E.g. here is how we are currently doing it in Hayhooks https://github.com/deepset-ai/hayhooks/blob/665b6a496a2652c07f27c827e2961ac29a9d0d29/src/hayhooks/server/pipelines/utils.py#L99

It could still be useful to provide this as a utility function in Haystack, but it looks like we will need to develop a custom one anyways in Hayhooks to have it properly work with OpenWebUI.

sjrl avatar Jun 06 '25 12:06 sjrl