Introducing IOStream protocol and adding support for websockets
The goals of this PR are:
- introduction of
IOStreamclass and a mechanism to change the default output from the console to alternative outputs such as websockets - Introduction of
IOWebsocketsclass that can be used to output streams to websocket.
The plan is to accomplish this by the following steps:
- [x] introducing a new protocol
IOStreamwithprintandinputmethods - [x] creating an
IOConsoleimplementing the protocol using console - [x] refactoring
ConversableAgent,OpenAIClientandOpenAIWrapperto useIOStreaminternally - [x] testing
ConversableAgent,OpenAIClientandOpenAIWrapperusingIOConsole - [x] creating an
IOWebSocketsimplementing the protocol using web sockets - [x] testing
IOWebSocketswith unit tests (a bit tricky because it has to be a multithreaded test) - [x] testing
IOWebSocketswith a new notebook - [x] write documentation for implementing classes implementing
IOStreamprotocol
Why are these changes needed?
Related issue number
Closes https://github.com/microsoft/autogen/issues/1199 , closes https://github.com/microsoft/autogen/issues/1143 , closes https://github.com/microsoft/autogen/issues/217 , closes https://github.com/microsoft/autogen/issues/394
Related pull requests
https://github.com/microsoft/autogen/pull/1290, https://github.com/microsoft/autogen/pull/1414
Checks
- [x] I've included any doc changes needed for https://microsoft.github.io/autogen/. See https://microsoft.github.io/autogen/docs/Contribute#documentation to build and test documentation locally.
- [x] I've added tests (if relevant) corresponding to the changes introduced in this PR.
- [x] I've made sure all auto checks have passed.
Codecov Report
Attention: Patch coverage is 88.61386% with 23 lines in your changes missing coverage. Please review.
Project coverage is 66.50%. Comparing base (
72994ea) to head (ef808f6). Report is 433 commits behind head on main.
Additional details and impacted files
@@ Coverage Diff @@
## main #1551 +/- ##
===========================================
+ Coverage 36.39% 66.50% +30.10%
===========================================
Files 70 74 +4
Lines 7339 7481 +142
Branches 1604 1754 +150
===========================================
+ Hits 2671 4975 +2304
+ Misses 4431 2054 -2377
- Partials 237 452 +215
| Flag | Coverage Δ | |
|---|---|---|
| unittests | 66.40% <88.61%> (+30.02%) |
:arrow_up: |
Flags with carried forward coverage won't be shown. Click here to find out more.
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
This draft is great. I was just looking over the code. Initial thoughts:
- Additional context/meta should be provided with messages to make it optimally useful. Some of that can be done later but there are some low-hanging fruits.
- Tool & function call data should also be included.
I was going to wait until this weekend to get back to contributions but since this draft has direct implications for what I've been working on, I will begin working on contribs to this draft immediately.
@davorrunje I must say, this PR us MUCH more clearly written than the last one, excellent job! Let me know if there is anything I can do to help.
@jackgerrits, @victordibia, @ekzhu I refactored the websocket-based class and its tests. Everything works fine from the server perspective, but I have one doubt and I might use your help with it. In the case stream is set to True, the response is being streamed from OpenAI client and then streamed to websockets as you can see in the notebook/agentchat_websockets.ipynb. After the answer is completed, it will be sent again to the output. I think this will be difficult to handle on the client side as is because the client should detect a part of the output that should be replaced with the repeated complete message. I am not sure why this was implemented this way and what would be the best way to handle it.
@sonichi could you please run all tests to make sure there are no errors? We are testing the websockets in a production application right now to make sure everything works as expected under real-world scenarios.
@qingyun-wu @sonichi for running open ai test
the test failure in test_agent_logging is fixed with this PR: #2036
This is a great addition to the utils we have. My main question is do we want to use the same stream for all I/Os?
The other things is the use of global stream object setting is something I am not very used to. How does it compare to setting the stream objects explicity in the methods that trigger I/Os?
The global stream is not global, it is specific to your thread or async call stack as it uses ContexVar for the implementation: https://github.com/microsoft/autogen/blob/00ef5b370d7b0bcd83fc348bfb860c47e775f112/autogen/io/base.py#L55
In a typical server deployment, you get a new default stream tied to each websocket because each will be created in a new thread or async call which gets created for each new request. This is the easiest way to implement a server handling multiple requests.
If you need different streams for a single request, you can override this behaviour as you get an IOStream object in on_connect function you need to override, see e.g.
https://github.com/microsoft/autogen/blob/00ef5b370d7b0bcd83fc348bfb860c47e775f112/test/io/test_websockets.py#L89-L149
An alternative is propagating iostream parameter through all classes and I implemented that initially, but it was a lot more complex to use and override.
Thanks @davorrunje , the office hour discussion helps to clarify a lot!
As discussed in the office hour, the next step is to ask @victordibia review this PR and perhaps meet separately to ensure the changes in this PR is compatible with autogen studio.
@sonichi can you please run AI tests?
@sonichi can you please run AI tests?
How about running it after the conflicts are fixed?
Something went wrong when i run the agentchat_websockets, how can i solve this problem?
Something went wrong when i run the agentchat_websockets, how can i solve this problem?
@SsspongeBob what was the problem?
@sonichi can you please run AI tests?
How about running it after the conflicts are fixed?
@sonichi fixed. Can you please run tests now?
Something went wrong when i run the agentchat_websockets, how can i solve this problem?
@SsspongeBob what was the problem?
As the red text in the image says, I cannot create a cache folder locally. Is it a permission issue?
Something went wrong when i run the agentchat_websockets, how can i solve this problem?
@SsspongeBob what was the problem?
As the red text in the image says, I cannot create a cache folder locally. Is it a permission issue?
@SsspongeBob This piece of code creates a temporary directory so you always get a fresh response:
# we will use a temporary directory as the cache path root to ensure fresh completion each time
with TemporaryDirectory() as cache_path_root:
with Cache.disk(cache_path_root=cache_path_root) as cache:
print(
f" - on_connect(): Initiating chat with agent {agent} using message '{initial_msg}'",
flush=True,
)
user_proxy.initiate_chat( # noqa: F704
agent,
message=initial_msg,
cache=cache,
)
You can safely replace it with this:
user_proxy.initiate_chat( # noqa: F704
agent,
message=initial_msg,
)
Something went wrong when i run the agentchat_websockets, how can i solve this problem?
@SsspongeBob what was the problem?
As the red text in the image says, I cannot create a cache folder locally. Is it a permission issue?
@SsspongeBob This piece of code creates a temporary directory so you always get a fresh response:
# we will use a temporary directory as the cache path root to ensure fresh completion each time with TemporaryDirectory() as cache_path_root: with Cache.disk(cache_path_root=cache_path_root) as cache: print( f" - on_connect(): Initiating chat with agent {agent} using message '{initial_msg}'", flush=True, ) user_proxy.initiate_chat( # noqa: F704 agent, message=initial_msg, cache=cache, )You can safely replace it with this:
user_proxy.initiate_chat( # noqa: F704 agent, message=initial_msg, )
Thank you very much, the problem has gone!
@sonichi can you please run AI test?
OAI Tests passed. @davorrunje one last merge conflict.
OAI Tests passed. @davorrunje one last merge conflict.
@ekzhu fixed :)
@davorrunje Congratulations, I have seen you working hard every day on this feature for months! Excellent work.
Merging this to anticipate changes in #2030 for structured output of prints.
Can you also create a topic page under /website/docs/topics/ to explain how to use this streaming util? You can simply copy the notebook to the /website/docs/topics/iostream.ipynb to make it available as a web page. I think the first example should be just about the utils not involving any agents: how to redirect iostream to a different place, etc. to illustrate the usage. Then you can show an example of using with ... context manager to change the output stream to a file, for example.
Merging this to anticipate changes in #2030 for structured output of prints.
Can you also create a topic page under
/website/docs/topics/to explain how to use this streaming util? You can simply copy the notebook to the/website/docs/topics/iostream.ipynbto make it available as a web page. I think the first example should be just about the utils not involving any agents: how to redirect iostream to a different place, etc. to illustrate the usage. Then you can show an example of usingwith ...context manager to change the output stream to a file, for example.
@ekzhu sure, I'll do it tomorrow night.
davorrunje Getting problem with this notebook
- test_setup() with websocket server running on ws://127.0.0.1:8765.
- on_connect(): Connected to client using IOWebsockets <autogen.io.websockets.IOWebsockets object at 0x0000021BDE10ED70> - Connected to server on ws://127.0.0.1:8765
- on_connect(): Receiving message from client. - Sending message to server.
- IOWebsockets._handler(): Error in on_connect: [Errno 22] Cache directory "./C:\Users\erifi\AppData\Local\Temp\tmpa7t3_152/42" does not exist and could not be created
Traceback (most recent call last):
davorrunje Getting problem with this notebook
- test_setup() with websocket server running on ws://127.0.0.1:8765. - on_connect(): Connected to client using IOWebsockets <autogen.io.websockets.IOWebsockets object at 0x0000021BDE10ED70> - Connected to server on ws://127.0.0.1:8765 - on_connect(): Receiving message from client. - Sending message to server. - IOWebsockets._handler(): Error in on_connect: [Errno 22] Cache directory "./C:\Users\erifi\AppData\Local\Temp\tmpa7t3_152/42" does not exist and could not be created Traceback (most recent call last):
@beckortikov The author provided the answer above; you can scroll up to see it.
davorrunje Getting problem with this notebook
- test_setup() with websocket server running on ws://127.0.0.1:8765.
- on_connect(): Connected to client using IOWebsockets <autogen.io.websockets.IOWebsockets object at 0x0000021BDE10ED70> - Connected to server on ws://127.0.0.1:8765
- on_connect(): Receiving message from client. - Sending message to server.
- IOWebsockets._handler(): Error in on_connect: [Errno 22] Cache directory "./C:\Users\erifi\AppData\Local\Temp\tmpa7t3_152/42" does not exist and could not be created
Traceback (most recent call last):
@beckortikov The author provided the answer above; you can scroll up to see it.
Thanx for reporting it, the workaround is above. I'll update the notebook.
@davorrunje Thank you sir! I have also one questions. How can I implement it to AI chat with human_input_mode ALWAYS, where I also have function calls. Looking for a lot of issues, but couldn't find any solution how to deploy it through FastAPI, and make conversable agent. Will begreat if you could provide little code in this case. Thank you!
@davorrunje Thank you sir! I have also one questions. How can I implement it to AI chat with human_input_mode ALWAYS, where I also have function calls. Looking for a lot of issues, but couldn't find any solution how to deploy it through FastAPI, and make conversable agent. Will begreat if you could provide little code in this case. Thank you!
@beckortikov can you please check out https://captn.ai/ for an example of a larger system built with this and related code not yet merged into autogen?
I’ll factor out the mechanism to build it and open-source it in the following weeks. One major part is distributing it to multiple workers and streaming results back to clients. The streaming part was merged this week into autogen. The distribution of workers is not yet as it is a bit more complex to factor it out. There is also a code injection mechanism with a bunch of security related features. LLM never get any sensitive information about users, but functions called by LLM have access to it through code injection. There are also helpers like a decorator for checking if the user explicitly approves calling a function with specific parameters (when human mode is never). Anyway, you should be able to easily build something like this very soon.
any docs on how to use streaming? Thanks a lot for the help.
any docs on how to use streaming? Thanks a lot for the help.
@ekzhu @jackgerrits could you check if this is in the doc roadmap?

