chainlit icon indicating copy to clipboard operation
chainlit copied to clipboard

Implementing Streaming Output with Autogen and Chainlit Integration

Open lianghsun opened this issue 1 year ago • 6 comments

I have a question regarding an example on https://github.com/Chainlit/cookbook/blob/main/pyautogen/app.py (or async version: https://github.com/Chainlit/cookbook/blob/main/pyautogen/async_app.py) that combines autogen and chainlit. I've been trying to modify the code to enable streaming output, but so far, no results have appeared on the screen. My code is as follows:

class ChainlitAgent(AssistantAgent):
    def __init__(self, *args, **kwargs):
        super().__init__(
            system_message=SYSTEM_PROMPT,
            name="assistant",
            llm_config={
                "stream": True
            }
        )
        
    async def a_send(
        self,
        message: Union[Dict, str],
        recipient: Agent,
        request_reply: Optional[bool] = None,
        silent: Optional[bool] = False,
    ) -> bool:
            
        msg = cl.run_sync(
            cl.Message(content="", author="assistant")
        )
     
        async for part in message:
            if token := part.choices[0].delta.content or "":
                print(token)
                await msg.stream_token(token)
                
        await msg.send()

        await super(ChainlitAgent, self).a_send(
            message=message,
            recipient=recipient,
            request_reply=request_reply,
            silent=silent,
        )

Is there a way to implement streaming with autogen in a chainlit environment?

lianghsun avatar Dec 13 '23 06:12 lianghsun

Does autogen support streaming in the first place? If so can you link the docs?

willydouhard avatar Dec 13 '23 08:12 willydouhard

Ref here: https://github.com/microsoft/autogen/issues/217#issuecomment-1783895515

lianghsun avatar Dec 13 '23 08:12 lianghsun

Interesting! I see a code snippet to enable streaming but I don't see an example to consume the stream?

willydouhard avatar Dec 13 '23 08:12 willydouhard

Based on my understanding, right now Autogen supports streaming the response from llm, but not the completed answer (the message one agent sent to another).

cccc11231 avatar Dec 13 '23 16:12 cccc11231

I recently experimented with using monkey patching to implement streaming functionality. This method enables the dynamic and flexible addition of new Agents without the need to alter the original autogen source code. Note: You can isolate the monkey patching method into its own separate library. Here's an example of the successful code I developed. If this approach seems viable, I'm willing to propose a pull request.

class ChainlitAssistantStreamableAgent(AssistantAgent):
    def __init__(self, *args, **kwargs):
        super().__init__(
            system_message=SYSTEM_PROMPT,
            name="assistant",
            llm_config=llm_config,
        )
        self.patch_openai_wrapper()
        
    def patch_openai_wrapper(self):
        from openai import __version__ as OPENAIVERSION
        from openai.types.chat import ChatCompletion
        from openai.types.chat.chat_completion import ChatCompletionMessage, Choice
        from openai.types.completion_usage import CompletionUsage
        from autogen.token_count_utils import count_token
        
        def new_completions_create(client, params):
            completions = client.chat.completions if "messages" in params else client.completions
            if params.get("stream", False) and "messages" in params and "functions" not in params: # note: you should enable stream at first
            # if params.get("stream", False) and "messages" in params:
                response_contents = [""] * params.get("n", 1)
                finish_reasons = [""] * params.get("n", 1)
                completion_tokens = 0

                # Set the terminal text color to green
                print("\033[32m", end="")
                
                # Add chianlit
                msg = cl.Message(content=f"")
                cl.run_sync(msg.send())

                for chunk in completions.create(**params):
                    if chunk.choices:
                        for choice in chunk.choices:
                            content = choice.delta.content
                            finish_reasons[choice.index] = choice.finish_reason
                            
                            if content is not None:
                                print(content, end="", flush=True)
                                cl.run_sync(msg.stream_token(cc.convert(content)))
                                response_contents[choice.index] += content
                                completion_tokens += 1
                            else:
                                print()
                                cl.run_sync(msg.update())
                                
                # Reset the terminal text color
                print("\033[0m\n")

                # Prepare the final ChatCompletion object based on the accumulated data
                model = chunk.model.replace("gpt-35", "gpt-3.5")  # hack for Azure API
                prompt_tokens = count_token(params["messages"], model)
                response = ChatCompletion(
                    id=chunk.id,
                    model=chunk.model,
                    created=chunk.created,
                    object="chat.completion",
                    choices=[],
                    usage=CompletionUsage(
                        prompt_tokens=prompt_tokens,
                        completion_tokens=completion_tokens,
                        total_tokens=prompt_tokens + completion_tokens,
                    ),
                )
                for i in range(len(response_contents)):
                    if OPENAIVERSION >= "1.5":  # pragma: no cover
                        # OpenAI versions 1.5.0 and above
                        choice = Choice(
                            index=i,
                            finish_reason=finish_reasons[i],
                            message=ChatCompletionMessage(
                                role="assistant", content=response_contents[i], function_call=None
                            ),
                            logprobs=None,
                        )
                    else:
                        # OpenAI versions below 1.5.0
                        choice = Choice(
                            index=i,
                            finish_reason=finish_reasons[i],
                            message=ChatCompletionMessage(
                                role="assistant", content=response_contents[i], function_call=None
                            ),
                        )

                    response.choices.append(choice)
            else:
                # If streaming is not enabled or using functions, send a regular chat completion request
                # Functions are not supported, so ensure streaming is disabled
                params = params.copy()
                params["stream"] = False
                response = completions.create(**params)
            return response
    
        self.client._completions_create = new_completions_create

lianghsun avatar Dec 28 '23 03:12 lianghsun

Coming back to this: since release 0.2.21 supports IoStream, could this be achieved by creating a custom IoStream? Something like this (proto-code):

import chainlit as cl
from typing import Any
from autogen.io.base import IOStream

class ChainlitIOStream(IOStream):
    def __init__(self, author):
        self.author = author
        self.message = cl.Message(author=author)

    def print(self, *args: Any, **kwargs) -> None:
        return self.message.stream_token(args)

    def input(self, prompt: str = "", *, password: bool = False) -> str:
        return cl.AskUserMessage(prompt).send()

default_io_stream = ChainlitIOStream(author="MyBot")


def set_custom_IO_overrides():
    IOStream.set_global_default(default_io_stream)

Then when the OpenAIClient.create gets called it does the printing of the stream here:

                        # If content is present, print it to the terminal and update response variables
                        if content is not None:
                            iostream.print(content, end="", flush=True)
                            response_contents[choice.index] += content
                            completion_tokens += 1
                        else:
                            # iostream.print()
                            pass

Ps. @willydouhard yes, Chainlit supports streaming

stezz avatar Apr 09 '24 05:04 stezz