Consider providing a class-based streaming callback to be passed at runtime
Is your feature request related to a problem? Please describe.
While working on the parent issue https://github.com/deepset-ai/haystack/issues/9347 it's becoming evident that it's useful for the streaming callback function to have state. For example in the parent issue example, we see how keeping track of the _openai_tool_call_index allows us to be able to properly end the mark down formatting of an OpenAI Tool Call.
Here is the example code
def _render_openai_tool_call(self, chunk_received: StreamingChunk) -> StreamingChunk:
tool_calls = chunk_received.meta.get("tool_calls") or []
for tool_call in tool_calls:
if not tool_call.function:
continue
# mutliple tool calls (distinguished by index) can be concatenated without finish_reason in between
if self._openai_tool_call_index < tool_call.index:
chunk_received.content += self.TOOL_END
self._openai_tool_call_index = tool_call.index
if tool_name := tool_call.function.name:
chunk_received.content += self.TOOL_START.format(tool_name=tool_name)
if arguments := tool_call.function.arguments:
chunk_received.content += arguments
if chunk_received.meta.get("finish_reason") == "tool_calls":
chunk_received.content += self.TOOL_END
return chunk_received
From working with the OpenAI SDK and their ChatCompletionChunks I have not found good way to indicate that a ToolCall is finished other than by using state. finish_reason is not good enough because using finish_reason would only be enough if there is a single tool call requested. If there are two tool calls in a single LLM response then the finish_reason is only provided after the second tool call is finished.
Describe the solution you'd like I'd like to provide as a utility function a class based streaming callback like in the parent issue. So something like
class StreamingCallback:
def __init__(self):
self._index = 0
def __call__(self, chunk: StreamingChunk):
...
which then should only be passed at runtime like
result2 = agent.run(
[ChatMessage.from_user("What's the weather in Berlin and Paris?")], streaming_callback=StreamingCallback()
)
This should only be passed at runtime because we need a new instance of StreamingCallback everytime a component or pipeline is run otherwise sharing the internal state (self._index) will conflict between different runs.
Describe alternatives you've considered
Another alternative we could consider is allowing for a streaming_callback_factory as an init parameter. Where we would take in a function that when called returns a streaming_callback function. Then we could call this factory every time the component is run to ensure that the state is independent between runs.
Additional context This topic is exploratory so should be investigated before proceeding with any changes.
For some additional context it's true that we need to create a new instance of a streaming_callback for every Pipeline.run done in a deployment setting.
E.g. here is how we are currently doing it in Hayhooks https://github.com/deepset-ai/hayhooks/blob/665b6a496a2652c07f27c827e2961ac29a9d0d29/src/hayhooks/server/pipelines/utils.py#L99
It could still be useful to provide this as a utility function in Haystack, but it looks like we will need to develop a custom one anyways in Hayhooks to have it properly work with OpenWebUI.