chainlit
chainlit copied to clipboard
Implementing Streaming Output with Autogen and Chainlit Integration
I have a question regarding an example on https://github.com/Chainlit/cookbook/blob/main/pyautogen/app.py (or async version: https://github.com/Chainlit/cookbook/blob/main/pyautogen/async_app.py) that combines autogen and chainlit. I've been trying to modify the code to enable streaming output, but so far, no results have appeared on the screen. My code is as follows:
class ChainlitAgent(AssistantAgent):
def __init__(self, *args, **kwargs):
super().__init__(
system_message=SYSTEM_PROMPT,
name="assistant",
llm_config={
"stream": True
}
)
async def a_send(
self,
message: Union[Dict, str],
recipient: Agent,
request_reply: Optional[bool] = None,
silent: Optional[bool] = False,
) -> bool:
msg = cl.run_sync(
cl.Message(content="", author="assistant")
)
async for part in message:
if token := part.choices[0].delta.content or "":
print(token)
await msg.stream_token(token)
await msg.send()
await super(ChainlitAgent, self).a_send(
message=message,
recipient=recipient,
request_reply=request_reply,
silent=silent,
)
Is there a way to implement streaming with autogen in a chainlit environment?
Does autogen support streaming in the first place? If so can you link the docs?
Ref here: https://github.com/microsoft/autogen/issues/217#issuecomment-1783895515
Interesting! I see a code snippet to enable streaming but I don't see an example to consume the stream?
Based on my understanding, right now Autogen supports streaming the response from llm, but not the completed answer (the message one agent sent to another).
I recently experimented with using monkey patching to implement streaming functionality. This method enables the dynamic and flexible addition of new Agents without the need to alter the original autogen source code. Note: You can isolate the monkey patching method into its own separate library. Here's an example of the successful code I developed. If this approach seems viable, I'm willing to propose a pull request.
class ChainlitAssistantStreamableAgent(AssistantAgent):
def __init__(self, *args, **kwargs):
super().__init__(
system_message=SYSTEM_PROMPT,
name="assistant",
llm_config=llm_config,
)
self.patch_openai_wrapper()
def patch_openai_wrapper(self):
from openai import __version__ as OPENAIVERSION
from openai.types.chat import ChatCompletion
from openai.types.chat.chat_completion import ChatCompletionMessage, Choice
from openai.types.completion_usage import CompletionUsage
from autogen.token_count_utils import count_token
def new_completions_create(client, params):
completions = client.chat.completions if "messages" in params else client.completions
if params.get("stream", False) and "messages" in params and "functions" not in params: # note: you should enable stream at first
# if params.get("stream", False) and "messages" in params:
response_contents = [""] * params.get("n", 1)
finish_reasons = [""] * params.get("n", 1)
completion_tokens = 0
# Set the terminal text color to green
print("\033[32m", end="")
# Add chianlit
msg = cl.Message(content=f"")
cl.run_sync(msg.send())
for chunk in completions.create(**params):
if chunk.choices:
for choice in chunk.choices:
content = choice.delta.content
finish_reasons[choice.index] = choice.finish_reason
if content is not None:
print(content, end="", flush=True)
cl.run_sync(msg.stream_token(cc.convert(content)))
response_contents[choice.index] += content
completion_tokens += 1
else:
print()
cl.run_sync(msg.update())
# Reset the terminal text color
print("\033[0m\n")
# Prepare the final ChatCompletion object based on the accumulated data
model = chunk.model.replace("gpt-35", "gpt-3.5") # hack for Azure API
prompt_tokens = count_token(params["messages"], model)
response = ChatCompletion(
id=chunk.id,
model=chunk.model,
created=chunk.created,
object="chat.completion",
choices=[],
usage=CompletionUsage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
),
)
for i in range(len(response_contents)):
if OPENAIVERSION >= "1.5": # pragma: no cover
# OpenAI versions 1.5.0 and above
choice = Choice(
index=i,
finish_reason=finish_reasons[i],
message=ChatCompletionMessage(
role="assistant", content=response_contents[i], function_call=None
),
logprobs=None,
)
else:
# OpenAI versions below 1.5.0
choice = Choice(
index=i,
finish_reason=finish_reasons[i],
message=ChatCompletionMessage(
role="assistant", content=response_contents[i], function_call=None
),
)
response.choices.append(choice)
else:
# If streaming is not enabled or using functions, send a regular chat completion request
# Functions are not supported, so ensure streaming is disabled
params = params.copy()
params["stream"] = False
response = completions.create(**params)
return response
self.client._completions_create = new_completions_create
Coming back to this: since release 0.2.21 supports IoStream, could this be achieved by creating a custom IoStream? Something like this (proto-code):
import chainlit as cl
from typing import Any
from autogen.io.base import IOStream
class ChainlitIOStream(IOStream):
def __init__(self, author):
self.author = author
self.message = cl.Message(author=author)
def print(self, *args: Any, **kwargs) -> None:
return self.message.stream_token(args)
def input(self, prompt: str = "", *, password: bool = False) -> str:
return cl.AskUserMessage(prompt).send()
default_io_stream = ChainlitIOStream(author="MyBot")
def set_custom_IO_overrides():
IOStream.set_global_default(default_io_stream)
Then when the OpenAIClient.create gets called it does the printing of the stream here:
# If content is present, print it to the terminal and update response variables
if content is not None:
iostream.print(content, end="", flush=True)
response_contents[choice.index] += content
completion_tokens += 1
else:
# iostream.print()
pass
Ps. @willydouhard yes, Chainlit supports streaming