PocketFlow icon indicating copy to clipboard operation
PocketFlow copied to clipboard

请问支持流式返回吗

Open wenxinmomo opened this issue 9 months ago • 11 comments

您好,感谢编写的优秀框架。

  1. 请问是否支持中间节点或者最终节点流式返回。
  2. 中间节点是否支持中断,等待前端用户下一指令继续执行

wenxinmomo avatar Mar 20 '25 01:03 wenxinmomo

Hey check out: https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-llm-streaming

zachary62 avatar Mar 20 '25 20:03 zachary62

Would it be better to use a callback mechanism to handle stream trunk output from an intermediate node in a workflow? I have extended StreamNode and StreamFlow as examples, using them to work with Gradio to display the workflow execution progress.

class StreamNode(Node):
    def exec_stream(self, prep_res) -> Generator[Any, None, None]:
        yield self.exec(prep_res)
        
    def _exec(self, prep_res):
        results = []
        for result in self.exec_stream(prep_res):
            results.append(result)
        return results[-1] if results else None
    
    def run_stream(self, shared, callback: Optional[Callable[[Any], None]]=None):
        """Invoke callback to process the stream trunk"""
        
        if self.successors: 
            warnings.warn("StreamNode won't run successors with stream output. Use StreamFlow.")
        p = self.prep(shared)
        for result in self.exec_stream(p):
            if callback:
                callback(result, self.__class__.__name__)
        return self.post(shared, p, result)


class StreamFlow(Flow):
    def run_stream(self, shared, callback: Optional[Callable[[Any, str], None]]=None):
        pr = self.prep(shared)
        self._orch_stream(shared, callback=callback)
        return self.post(shared, pr, None)
    
    def _orch_stream(self, shared, params=None, callback=None):
        curr, p = copy.copy(self.start), (params or {**self.params})
        while curr:
            node_name = curr.__class__.__name__
            print(node_name, end="", flush=True)
            
            curr.set_params(p)
            if isinstance(curr, StreamNode):
                # Hendle stream node and invoke the callback to process the stream trunk
                
                c = None
                for result in curr.exec_stream(curr.prep(shared)):
                    if callback:
                        callback(result, node_name)
                    c = result
                c = curr.post(shared, None, c)
            else:
                # Handle normal node

                c = curr._run(shared)
            curr = copy.copy(self.get_next_node(curr, c))

An example showed below:

class LLMStreamNode(StreamNode):
    ...
    def exec_stream(self, prep_res):

        response = completion(..., stream=True)
        for chunk in response:
            if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'):
                content = chunk.choices[0].delta.content
                if content:
                    yield content


def handle_stream(trunk, node_name):
    print(trunk, end='', flush=True)

llm = LLMStreamNode()
llm.run_stream(shared, callback=handle_stream)

litaojin avatar Mar 22 '25 03:03 litaojin

@litaojin Callbacks are very interesting! I'll think more about them.

I tried implementing orch in Flow with a callback before, since asyncio is hard to use. However, I ran into an issue that the call stack grew infinitely. It was especially problematic for large batch nodes. I also tried using yield before, but it didn’t work well.

I’d love to take a closer look at your code. Could you provide a runnable Python script demonstrating how it works, with example inputs? Also, curious how can a stream node compose with other nodes? Do you have any suggestions for handling interruptions?

Thank you!

zachary62 avatar Mar 22 '25 03:03 zachary62

Firstly, I would appreciate this wonderful framework really helped on my current work. The current code is just the result of a few hours after I saw this framework, so I started implementing ideas from the simplest synchronous calls. I'll try the asynchronous possibilities later.

The following example should run for a simple demo. Please replace the shared['api'] with your LLM provider.

from utils import Node, Flow
from typing import Generator, Callable, Any, Optional, Dict, List, Union
import copy
from litellm import completion # Use LiteLLM for LLM call

# StreamNode and StreamFlow are basic class extended to support stream out with the callback
class StreamNode(Node):
    def exec_stream(self, prep_res) -> Generator[Any, None, None]:
        yield self.exec(prep_res)
        
    def _exec(self, prep_res):
        results = []
        for result in self.exec_stream(prep_res):
            results.append(result)
        return results[-1] if results else None
    
    def run_stream(self, shared, callback: Optional[Callable[[Any], None]]=None):
        """Invoke callback to process the stream trunk"""
        
        if self.successors: 
            warnings.warn("StreamNode won't run successors with stream output. Use StreamFlow.")
        p = self.prep(shared)
        for result in self.exec_stream(p):
            if callback:
                callback(result, self.__class__.__name__)
        return self.post(shared, p, result)

class StreamFlow(Flow):
    def run_stream(self, shared, callback: Optional[Callable[[Any, str], None]]=None):
        pr = self.prep(shared)
        self._orch_stream(shared, callback=callback)
        return self.post(shared, pr, None)
    
    def _orch_stream(self, shared, params=None, callback=None):
        curr, p = copy.copy(self.start), (params or {**self.params})
        while curr:
            node_name = curr.__class__.__name__
            
            curr.set_params(p)
            if isinstance(curr, StreamNode):
                # Hendle stream node and invoke the callback to process the stream trunk
                
                c = None
                for result in curr.exec_stream(curr.prep(shared)):
                    if callback:
                        callback(result, node_name)
                    c = result
                c = curr.post(shared, None, c)
            else:
                # Handle normal node

                c = curr._run(shared)
            curr = copy.copy(self.get_next_node(curr, c))

# This is the demo to show StreamNode
class UserMessageNode(Node):
    def prep(self, shared):
        return shared["sys_prompt"], shared["message"], shared["history"], shared["api"]
        
    def exec(self, prep_res):
        sys_prompt, message, history, api = prep_res
        messages = [
            {"role": "system", "content": sys_prompt}
        ]

        for h in history:
            messages.append({"role": h["role"], "content": h["content"]})
        
        messages.append({"role": "user", "content": message})
        return messages
    def post(self, shared, prep_res, exec_res):
        shared["messages"] = exec_res
        return "default"

class LLMStreamNode(StreamNode):
    def prep(self, shared):
        return shared["messages"], shared["api"]

    def exec_stream(self, prep_res):
        messages, api = prep_res       
        response = completion(
            model=api["model"],
            messages=messages,
            api_base=api['url'],
            timeout=300,
            api_key=api['key'],
            temperature=0.6,
            
            stream=True
        )
        for chunk in response:
            if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'):
                content = chunk.choices[0].delta.content
                if content:
                    yield content

    def post(self, shared, prep_res, exec_res):     
        return "default"


def handle_stream(trunk, node_name):
    print(trunk, end='', flush=True)
    
message = "Hello, tell me what you can do?"
shared = {
    "sys_prompt": "You are a helpful AI bot!",
    "message": message,
    "history": [],
    # "api": {'url': '<url_for_your_llm_provider>', 'key': '<api_key>', 'model': '<model_name>'}
    "api": api_qwen # Please replace with your AI provider
}

user_msg = UserMessageNode()
llm = LLMStreamNode()

# Step by step exxcution to show StreamNode with callback
user_msg.run(shared)
llm.run_stream(shared, callback=handle_stream)

# Use flow to demo StreamFlow
user_msg >> llm
llm_flow = LLMStreamNode(user_msg)
llm_flow.run_stream(shared, callback=handle_stream)

litaojin avatar Mar 22 '25 05:03 litaojin

Got it! Thank you for the example! If I understand correctly, you want to achieve streaming processing with a push-based execution model. Having a single callback function for all nodes is a bit awkward, since each node in the flow must share the same function. It would be much better to let each downstream node handle streaming data processing. It’s a great idea, and I’d love to integrate it if there are more use cases.

zachary62 avatar Mar 22 '25 14:03 zachary62

You're right, my current use case is only for obtaining LLM stream output, and it's only on this one LLM Node. How to elegantly implement this under the framework indeed requires more wholisitic consideration.

litaojin avatar Mar 23 '25 03:03 litaojin

@zachary62 I had another idea to support stream. Most of use cases for stream out looks like:

for trunk in flow.run_stream(start_node):
   ...

run_stream should work as a generator which means that nodes on the workflow chain, either need stream output to expose intermediate results (derived from StreamNode), or not need it as regular nodes. Defining a StreamFlow can differently process these two types of nodes

Class StreamNode(BaseNode):
   def exec_stream(self, prep_res) -> Generator:
      yield trunk

Class StreamFlow(Flow):
   def run_stream():
      yield from self._orch_stream()
  
   def _orch_stream():
      while curr:
         if isinstance(curr, StreamNode):               
                pr = curr.prep(shared)
                yield from curr.exec_stream(pr)                    
                c = curr.post(shared, pr, None)
         else:              
                c = curr._run(shared)
         curr = copy.copy(self.get_next_node(curr, c))

This approach has one aspect that's incompatible with the original framework's design philosophy, which is that StreamFlow cannot directly return an action that indicates the next step behavior, requiring consideration of other ways to pass this result. If StreamFlow is the top-level workflow, this wouldn't be a problem.

litaojin avatar Mar 24 '25 01:03 litaojin

Very cool! What are the most important use cases for the stream flow?

zachary62 avatar Mar 24 '25 15:03 zachary62

One typical example I can think of is Multi-step Agents, where "LLM output controls iteration and program continuation" - which is actually the use case in my current project. For instance, in a Plan >> Action; Action >> Plan workflow, we need to stream the output to observe the iterative Plan cycle.

litaojin avatar Mar 26 '25 02:03 litaojin

The action >> plan approach sounds like the agent (https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-agent). Could you explain how streaming comes into play?

zachary62 avatar Mar 26 '25 04:03 zachary62

Yes, it is a Multi-step Agent. The pseudo code looks like below:

Class StreamNode(BaseNode):
   def exec_stream(self, prep_res) -> Generator:
      yield None

Class StreamFlow(Flow):
   def run_stream():
      yield from self._orch_stream()
  
   def _orch_stream():
      while curr:
         if isinstance(curr, StreamNode):               
                pr = curr.prep(shared)
                yield from curr.exec_stream(pr)                    
                c = curr.post(shared, pr, None)
         else:              
                c = curr._run(shared)
         curr = copy.copy(self.get_next_node(curr, c))

class Plan(StreamNode):  
    def prep(self, shared): ... # same as Node
    def post(self, shared, prep_res, exec_res):   ... # same as Node
    
    def exec_stream(self, prep_res):
        response = call_llm(... stream = True )
        for chunk in response:
            if hasattr(chunk, 'choices') and len(chunk.choices) > 0:
                if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'):
                    yield chunk.choices[0].delta.content

class Action(Node): ... 

class MyApp():

   def get_llm_response(history):
       plan = Plan()
       action = Action()

       plan >> action
       action >> plan

       flow = StreamFlow(plan)

       yield from flow.run_stream(shared)

   def bot(history):
       for output in get_llm_response(history):
          print(output, end="", flush=True)

litaojin avatar Mar 27 '25 01:03 litaojin