请问支持流式返回吗
您好,感谢编写的优秀框架。
- 请问是否支持中间节点或者最终节点流式返回。
- 中间节点是否支持中断,等待前端用户下一指令继续执行
Hey check out: https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-llm-streaming
Would it be better to use a callback mechanism to handle stream trunk output from an intermediate node in a workflow? I have extended StreamNode and StreamFlow as examples, using them to work with Gradio to display the workflow execution progress.
class StreamNode(Node):
def exec_stream(self, prep_res) -> Generator[Any, None, None]:
yield self.exec(prep_res)
def _exec(self, prep_res):
results = []
for result in self.exec_stream(prep_res):
results.append(result)
return results[-1] if results else None
def run_stream(self, shared, callback: Optional[Callable[[Any], None]]=None):
"""Invoke callback to process the stream trunk"""
if self.successors:
warnings.warn("StreamNode won't run successors with stream output. Use StreamFlow.")
p = self.prep(shared)
for result in self.exec_stream(p):
if callback:
callback(result, self.__class__.__name__)
return self.post(shared, p, result)
class StreamFlow(Flow):
def run_stream(self, shared, callback: Optional[Callable[[Any, str], None]]=None):
pr = self.prep(shared)
self._orch_stream(shared, callback=callback)
return self.post(shared, pr, None)
def _orch_stream(self, shared, params=None, callback=None):
curr, p = copy.copy(self.start), (params or {**self.params})
while curr:
node_name = curr.__class__.__name__
print(node_name, end="", flush=True)
curr.set_params(p)
if isinstance(curr, StreamNode):
# Hendle stream node and invoke the callback to process the stream trunk
c = None
for result in curr.exec_stream(curr.prep(shared)):
if callback:
callback(result, node_name)
c = result
c = curr.post(shared, None, c)
else:
# Handle normal node
c = curr._run(shared)
curr = copy.copy(self.get_next_node(curr, c))
An example showed below:
class LLMStreamNode(StreamNode):
...
def exec_stream(self, prep_res):
response = completion(..., stream=True)
for chunk in response:
if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'):
content = chunk.choices[0].delta.content
if content:
yield content
def handle_stream(trunk, node_name):
print(trunk, end='', flush=True)
llm = LLMStreamNode()
llm.run_stream(shared, callback=handle_stream)
@litaojin Callbacks are very interesting! I'll think more about them.
I tried implementing orch in Flow with a callback before, since asyncio is hard to use. However, I ran into an issue that the call stack grew infinitely. It was especially problematic for large batch nodes. I also tried using yield before, but it didn’t work well.
I’d love to take a closer look at your code. Could you provide a runnable Python script demonstrating how it works, with example inputs? Also, curious how can a stream node compose with other nodes? Do you have any suggestions for handling interruptions?
Thank you!
Firstly, I would appreciate this wonderful framework really helped on my current work. The current code is just the result of a few hours after I saw this framework, so I started implementing ideas from the simplest synchronous calls. I'll try the asynchronous possibilities later.
The following example should run for a simple demo. Please replace the shared['api'] with your LLM provider.
from utils import Node, Flow
from typing import Generator, Callable, Any, Optional, Dict, List, Union
import copy
from litellm import completion # Use LiteLLM for LLM call
# StreamNode and StreamFlow are basic class extended to support stream out with the callback
class StreamNode(Node):
def exec_stream(self, prep_res) -> Generator[Any, None, None]:
yield self.exec(prep_res)
def _exec(self, prep_res):
results = []
for result in self.exec_stream(prep_res):
results.append(result)
return results[-1] if results else None
def run_stream(self, shared, callback: Optional[Callable[[Any], None]]=None):
"""Invoke callback to process the stream trunk"""
if self.successors:
warnings.warn("StreamNode won't run successors with stream output. Use StreamFlow.")
p = self.prep(shared)
for result in self.exec_stream(p):
if callback:
callback(result, self.__class__.__name__)
return self.post(shared, p, result)
class StreamFlow(Flow):
def run_stream(self, shared, callback: Optional[Callable[[Any, str], None]]=None):
pr = self.prep(shared)
self._orch_stream(shared, callback=callback)
return self.post(shared, pr, None)
def _orch_stream(self, shared, params=None, callback=None):
curr, p = copy.copy(self.start), (params or {**self.params})
while curr:
node_name = curr.__class__.__name__
curr.set_params(p)
if isinstance(curr, StreamNode):
# Hendle stream node and invoke the callback to process the stream trunk
c = None
for result in curr.exec_stream(curr.prep(shared)):
if callback:
callback(result, node_name)
c = result
c = curr.post(shared, None, c)
else:
# Handle normal node
c = curr._run(shared)
curr = copy.copy(self.get_next_node(curr, c))
# This is the demo to show StreamNode
class UserMessageNode(Node):
def prep(self, shared):
return shared["sys_prompt"], shared["message"], shared["history"], shared["api"]
def exec(self, prep_res):
sys_prompt, message, history, api = prep_res
messages = [
{"role": "system", "content": sys_prompt}
]
for h in history:
messages.append({"role": h["role"], "content": h["content"]})
messages.append({"role": "user", "content": message})
return messages
def post(self, shared, prep_res, exec_res):
shared["messages"] = exec_res
return "default"
class LLMStreamNode(StreamNode):
def prep(self, shared):
return shared["messages"], shared["api"]
def exec_stream(self, prep_res):
messages, api = prep_res
response = completion(
model=api["model"],
messages=messages,
api_base=api['url'],
timeout=300,
api_key=api['key'],
temperature=0.6,
stream=True
)
for chunk in response:
if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'):
content = chunk.choices[0].delta.content
if content:
yield content
def post(self, shared, prep_res, exec_res):
return "default"
def handle_stream(trunk, node_name):
print(trunk, end='', flush=True)
message = "Hello, tell me what you can do?"
shared = {
"sys_prompt": "You are a helpful AI bot!",
"message": message,
"history": [],
# "api": {'url': '<url_for_your_llm_provider>', 'key': '<api_key>', 'model': '<model_name>'}
"api": api_qwen # Please replace with your AI provider
}
user_msg = UserMessageNode()
llm = LLMStreamNode()
# Step by step exxcution to show StreamNode with callback
user_msg.run(shared)
llm.run_stream(shared, callback=handle_stream)
# Use flow to demo StreamFlow
user_msg >> llm
llm_flow = LLMStreamNode(user_msg)
llm_flow.run_stream(shared, callback=handle_stream)
Got it! Thank you for the example! If I understand correctly, you want to achieve streaming processing with a push-based execution model. Having a single callback function for all nodes is a bit awkward, since each node in the flow must share the same function. It would be much better to let each downstream node handle streaming data processing. It’s a great idea, and I’d love to integrate it if there are more use cases.
You're right, my current use case is only for obtaining LLM stream output, and it's only on this one LLM Node. How to elegantly implement this under the framework indeed requires more wholisitic consideration.
@zachary62 I had another idea to support stream. Most of use cases for stream out looks like:
for trunk in flow.run_stream(start_node):
...
run_stream should work as a generator which means that nodes on the workflow chain, either need stream output to expose intermediate results (derived from StreamNode), or not need it as regular nodes. Defining a StreamFlow can differently process these two types of nodes
Class StreamNode(BaseNode):
def exec_stream(self, prep_res) -> Generator:
yield trunk
Class StreamFlow(Flow):
def run_stream():
yield from self._orch_stream()
def _orch_stream():
while curr:
if isinstance(curr, StreamNode):
pr = curr.prep(shared)
yield from curr.exec_stream(pr)
c = curr.post(shared, pr, None)
else:
c = curr._run(shared)
curr = copy.copy(self.get_next_node(curr, c))
This approach has one aspect that's incompatible with the original framework's design philosophy, which is that StreamFlow cannot directly return an action that indicates the next step behavior, requiring consideration of other ways to pass this result. If StreamFlow is the top-level workflow, this wouldn't be a problem.
Very cool! What are the most important use cases for the stream flow?
One typical example I can think of is Multi-step Agents, where "LLM output controls iteration and program continuation" - which is actually the use case in my current project. For instance, in a Plan >> Action; Action >> Plan workflow, we need to stream the output to observe the iterative Plan cycle.
The action >> plan approach sounds like the agent (https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-agent). Could you explain how streaming comes into play?
Yes, it is a Multi-step Agent. The pseudo code looks like below:
Class StreamNode(BaseNode):
def exec_stream(self, prep_res) -> Generator:
yield None
Class StreamFlow(Flow):
def run_stream():
yield from self._orch_stream()
def _orch_stream():
while curr:
if isinstance(curr, StreamNode):
pr = curr.prep(shared)
yield from curr.exec_stream(pr)
c = curr.post(shared, pr, None)
else:
c = curr._run(shared)
curr = copy.copy(self.get_next_node(curr, c))
class Plan(StreamNode):
def prep(self, shared): ... # same as Node
def post(self, shared, prep_res, exec_res): ... # same as Node
def exec_stream(self, prep_res):
response = call_llm(... stream = True )
for chunk in response:
if hasattr(chunk, 'choices') and len(chunk.choices) > 0:
if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'):
yield chunk.choices[0].delta.content
class Action(Node): ...
class MyApp():
def get_llm_response(history):
plan = Plan()
action = Action()
plan >> action
action >> plan
flow = StreamFlow(plan)
yield from flow.run_stream(shared)
def bot(history):
for output in get_llm_response(history):
print(output, end="", flush=True)