agentscope icon indicating copy to clipboard operation
agentscope copied to clipboard

Add support for stream pipeline decorator

Open rayrayraykk opened this issue 10 months ago • 4 comments


name: add support for stream pipeline decorator about: make the agentscope app a generator with simple decorator

Description

This pull request aims to enhance the agentscope application by introducing support for a stream pipeline decorator, which facilitates better compatibility with other API server pipelines. The main goal is to enable the agentscope app to return data in a streaming manner, allowing it to integrate more effectively with various API servers that require or benefit from streaming data capabilities.

Key Features

Pipeline Decorator: Implements a @pipeline decorator that transforms the application into a generator, enabling streaming data output.

Example Usage:

# -*- coding: utf-8 -*-
"""A simple example for conversation between user and assistant agent."""
import os
import agentscope
from agentscope.agents import DialogAgent, UserAgent
from agentscope.pipelines.functional import sequentialpipeline
from agentscope.utils.decorators import pipeline


@pipeline
def main() -> None:
    """A basic conversation demo"""

    agentscope.init(
        model_configs=[
            {
                "model_type": "dashscope_chat",
                "config_name": "qwen-max",
                "model_name": "qwen-max",
                "api_key": os.getenv("DASHSCOPE_API_KEY"),
                "stream": True,
            },
        ],
        project="Multi-Agent Conversation",
        save_api_invoke=True,
    )

    # Init two agents
    dialog_agent = DialogAgent(
        name="Assistant",
        sys_prompt="You're a helpful assistant.",
        model_config_name="qwen-max",  # replace by your model config name
    )
    user_agent = UserAgent()

    # start the conversation between user and assistant
    x = None
    while x is None or x.content != "exit":
        x = sequentialpipeline([user_agent, dialog_agent], x)


if __name__ == "__main__":
    for index, msg in enumerate(main()):
        print(index, msg.name, msg.content, "\n")

Checklist

Please check the following items before code is ready to be reviewed.

  • [X] Code has passed all tests
  • [X] Docstrings have been added/updated in Google Style
  • [X] Documentation has been updated
  • [X] Code is ready for review

rayrayraykk avatar Feb 28 '25 06:02 rayrayraykk

@pan-x-c Please help me to check the compatibility in distributed mode, thanks.

rayrayraykk avatar Feb 28 '25 06:02 rayrayraykk

Please see inline comments.

Besides, if we only want to obtain the application-level streaming output, how about use a hook function within the agent? So that

  1. we don't need to use threading (which maybe dangerous in dsitributed mode).
  2. we can extend it to tts easily.
  3. it's explicit for the developers and users E.g.
agent = Agent(
    # xxx
)

agent.register_speak_pre_hook()
agent.register_speak_post_hook()
agent.register_reply_pre_hook()
agent.register_reply_post_hook()
# ...

# for application level output
def send_to_somewhere(agent, input):
    """
    Args: 
        agent (`AgentBase`):
            The agent module itself
        input (`message | Generator`):
            # ...
    """
    # ...

agent.register_speak_pre_hook(
    send_to_somewhere
)

So that we can re-use this module when we implement tts functionality (e.g. speak out the message when calling the speak function)

Please see inline comments.

Besides, if we only want to obtain the application-level streaming output, how about use a hook function within the agent? So that

  1. we don't need to use threading (which maybe dangerous in dsitributed mode).
  2. we can extend it to tts easily.
  3. it's explicit for the developers and users E.g.
agent = Agent(
    # xxx
)

agent.register_speak_pre_hook()
agent.register_speak_post_hook()
agent.register_reply_pre_hook()
agent.register_reply_post_hook()
# ...

# for application level output
def send_to_somewhere(agent, input):
    """
    Args: 
        agent (`AgentBase`):
            The agent module itself
        input (`message | Generator`):
            # ...
    """
    # ...

agent.register_speak_pre_hook(
    send_to_somewhere
)

So that we can re-use this module when we implement tts functionality (e.g. speak out the message when calling the speak function)

Using the hook function to send messages is a good approach. However, to convert the application into a generator, it may still require concurrent threads or processes (one send, one get). Additionally, the model's response generator can only be yielded once, which already occurs in the speak function. To implement this change, we would need to modify the existing structure to accommodate this adjustment.

rayrayraykk avatar Mar 03 '25 10:03 rayrayraykk

Please see inline comments. Besides, if we only want to obtain the application-level streaming output, how about use a hook function within the agent? So that

  1. we don't need to use threading (which maybe dangerous in dsitributed mode).
  2. we can extend it to tts easily.
  3. it's explicit for the developers and users E.g.
agent = Agent(
    # xxx
)

agent.register_speak_pre_hook()
agent.register_speak_post_hook()
agent.register_reply_pre_hook()
agent.register_reply_post_hook()
# ...

# for application level output
def send_to_somewhere(agent, input):
    """
    Args: 
        agent (`AgentBase`):
            The agent module itself
        input (`message | Generator`):
            # ...
    """
    # ...

agent.register_speak_pre_hook(
    send_to_somewhere
)

So that we can re-use this module when we implement tts functionality (e.g. speak out the message when calling the speak function)

Please see inline comments. Besides, if we only want to obtain the application-level streaming output, how about use a hook function within the agent? So that

  1. we don't need to use threading (which maybe dangerous in dsitributed mode).
  2. we can extend it to tts easily.
  3. it's explicit for the developers and users E.g.
agent = Agent(
    # xxx
)

agent.register_speak_pre_hook()
agent.register_speak_post_hook()
agent.register_reply_pre_hook()
agent.register_reply_post_hook()
# ...

# for application level output
def send_to_somewhere(agent, input):
    """
    Args: 
        agent (`AgentBase`):
            The agent module itself
        input (`message | Generator`):
            # ...
    """
    # ...

agent.register_speak_pre_hook(
    send_to_somewhere
)

So that we can re-use this module when we implement tts functionality (e.g. speak out the message when calling the speak function)

Using the hook function to send messages is a good approach. However, to convert the application into a generator, it may still require concurrent threads or processes (one send, one get). Additionally, the model's response generator can only be yielded once, which already occurs in the speak function. To implement this change, we would need to modify the existing structure to accommodate this adjustment.

In this case, we just need to create a "during"(maybe some other name) hook.


def speak(self, msg: Union[Msg, Generator]):
    if isinstance(msg, Generator):
        for chunk in msg:
            // call hook function
            for func in self.__hooks_during_speak:
                func(self, chunk)
            // normal processing
            log_stream_msg(xxx)
            # ...

DavdGao avatar Mar 07 '25 08:03 DavdGao

~~After discussion, we should implement pushing msg to somewhere in a hook-like function whin the AgentBase. I' will do it after hongyi's PR.~~

Done

rayrayraykk avatar Mar 10 '25 03:03 rayrayraykk

Close as these code will be contributed in other repo.

rayrayraykk avatar Jul 07 '25 09:07 rayrayraykk