langgraph
langgraph copied to clipboard
Stream Chat LLM Token By Token is not working
Checked other resources
- [X] I added a very descriptive title to this issue.
- [X] I searched the LangChain documentation with the integrated search.
- [X] I used the GitHub search to find a similar question and didn't find it.
- [X] I am sure that this is a bug in LangChain rather than my code.
Example Code
I am using the code in this notebook: https://github.com/langchain-ai/langgraph/blob/main/examples/streaming-tokens.ipynb
streaming code:
from langchain_core.messages import HumanMessage
inputs = [HumanMessage(content="what is the weather in sf")]
async for event in app.astream_events(inputs, version="v1"):
kind = event["event"]
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
# Empty content in the context of OpenAI means
# that the model is asking for a tool to be invoked.
# So we only print non-empty content
print(content, end="|")
elif kind == "on_tool_start":
print("--")
print(
f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}"
)
elif kind == "on_tool_end":
print(f"Done tool: {event['name']}")
print(f"Tool output was: {event['data'].get('output')}")
print("--")
Error Message and Stack Trace (if applicable)
the streaming is not working, I am not receiving any output from this part:
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
# Empty content in the context of OpenAI means
# that the model is asking for a tool to be invoked.
# So we only print non-empty content
print(content, end="|")
Description
the streaming is not working, I am not receiving any output
System Info
langchain==0.1.5 langchain-community==0.0.17 langchain-core==0.1.18 langchain-openai==0.0.5 langgraph==0.0.21
This seems to be a bug introduced in the 0.0.21 release. If you downgrade to 0.0.20, and keep the core, community, langchain and openai versions the same, this seems to work.
Although, given that example, the on_tool_start and on_tool_end messages don't seem to be present in the 21, 20 or 19 releases for some reason.
@elliottshort So, what LangChain version do you recommend? with LangGraph 0.0.20
Because I tried to use LangGraph 0.0.20 with the latest LangChain version but it is still not working.
Hey @ahmedmoorsy, which python version are you using? I had a similar issue when running on python 3.10 but the streaming worked once I upgraded to 3.12 (3.11 should also work, I think).
@SavvasMohito That is interesting. I've been having the same issues with streaming using python 3.10.11. The only way I've been able to get around it is by modifying the wrapper functions. See the other LLM token thread for streaming with astream_log - I just converted the call_model wrapper function to an LCEL chain and it worked fine.
The same approach works with astream_events to get the correct output in the example notebook. I'm sure there's a more elegant way to do this, specifically to pass through the tool name, but this does work at least with python 3.10.11:
async def run_tools(messages):
last_message = messages[-1]
tool_name=last_message.additional_kwargs["function_call"]["name"]
create_function_message = lambda response, tool=tool_name: FunctionMessage(
content=str(response), name=tool
)
action = lambda messages: (ToolInvocation(
tool=messages[-1].additional_kwargs["function_call"]["name"],
tool_input=json.loads(messages[-1].additional_kwargs["function_call"]["arguments"]),
))
chain = action | tool_executor | create_function_message
return chain
Then instead of call_model as a node just use model:
workflow.add_node("agent", model)
workflow.add_node("action", run_tools)
Nice one @tgram-3D. Does streaming follow-up questions work for you? I can get responses streamed in my initial prompt but when I perform a second question, it never returns anything. It's not that I am not capturing it, it seems like langgraph never exits the model invocation loop.
@SavvasMohito I've just been playing around with the example streaming-tokens notebook with astream_events, and follow up questions seem to work fine using the node modifications described above. The notebook isn't really set up for memory though since all it does is add a single HumanMessage for each streaming event:
from langchain_core.messages import HumanMessage
inputs = [HumanMessage(content="what is the weather in sf")]
async for event in app.astream_events(inputs, version="v1"):
...
The way I'm currently using it for follow up questions is pretty basic:
class GraphConversation:
def __init__(self, graph):
self.graph = graph
self.messages = []
async def stream_conversation(self, question):
self.messages.append(HumanMessage(content=question))
async for event in app.astream_events(self.messages, version="v1"):
kind = event["event"]
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
# Empty content in the context of OpenAI means
# that the model is asking for a tool to be invoked.
# So we only print non-empty content
print(content, end="|")
elif kind == "on_tool_start":
print("--")
print(
f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}"
)
elif kind == "on_tool_end":
print(f"Done tool: {event['name']}")
print(f"Tool output was: {event['data'].get('output')}")
print("--")
graph_convo = GraphConversation(graph=app)
await graph_convo.stream_conversation('what is the weather in sf')
--
Starting tool: tavily_search_results_json with inputs: {'query': 'weather in San Francisco'}
Done tool: tavily_search_results_json
Tool output was: [{'url': 'https://www.marinij.com/2024/02/05/high-winds-heavy-rains-hit-the-bay-area/', 'content': '— San José Fire Dept. (@SJFD) February 5, 2024 Weather | Atmospheric river storm to soak Bay Area this weekend, with major flood danger in Southern California Crystal Oudit said the forecast will start to improve on Monday, though Bay Area residents should expect more rain. Weather Weather | High winds, heavy rains hit the Bay Area and the rest of California’s coast(Karl Mondon/Bay Area News Group) Waves crash over a breakwater in Alameda, Calif., with the San Francisco skyline in the background on Sunday, Feb. 4, 2024. High winds and heavy rainfall are ...'}]
--
The| weather| in| San| Francisco| is| currently| experiencing| high| winds| and| heavy| rains|.| You| can| find| more| information| about| the| weather| in| San| Francisco| [|here|](|https|://|www|.m|arin|ij|.com|/|202|4|/|02|/|05|/high|-w|inds|-heavy|-r|ains|-hit|-the|-b|ay|-area|/|).|
await graph_convo.stream_conversation('what was the last question I asked you')
The| last| question| you| asked| me| was| "|what| is| the| weather| in| SF|?"|
I'm sure there's a better way to do this sort of thing. Check out the new Persistence notebook. I currently cannot get astream_events to work with the persistence config at all though:
from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver.from_conn_string(":memory:")
app = workflow.compile(checkpointer=memory)
I tried this and got zero output, same deal without the configurable:
async for event in app.astream_events(inputs, {"configurable": {"thread_id": "2"}}, version="v1"):
I'm sure there's a way to combine the two features, but the simple class method should work fine for frontend integration with websockets, etc.
Hey @ahmedmoorsy, which python version are you using? I had a similar issue when running on python 3.10 but the streaming worked once I upgraded to 3.12 (3.11 should also work, I think).
Thank you! Upgrading from 3.10.13 to 3.12.2 fixed this demo for me
I was struggling with tokens streaming in this tutorial. And I have found another workaround to enable streaming. I just added extra parameter config to my call_model function and pass it to the llm invoke:
# Define the function that calls the model
async def call_model(messages, config):
response = await model.ainvoke(messages, config=config)
# We return a list, because this will get added to the existing list
return response
That way I was able to fetch tokens stream from llm.
Not clear on how, but dmitryrPlanner5D's comment solved this for me too. I'm using python 3.10.
What are you guys passing in the config? @simonrmonk @dmitryrPlanner5D
@OSS-GR I do not call function call_model myself, so I do not pass config there explicitly My function call_model is called by langgraph and langgraph passes some config to it. I do not control this config
Streaming is also not working if I update the agent_supervisor notebook as specified in the documention (set streaming to True and make the tool calls async).
~~The operator.add in the AgentState's messages is making it so that the final output from the supervisor has duplicated tokens, if I got it correctly~~. Nope, the operator.add annotation has nothing to do with it.
For example, here's the log from a slightly tweaked version of the notebook. In this example, the supervisor should output MongoDBAgent but it's outputting MongoMongoDBMongoDBAgent, resulting in a KeyError.
{'op': 'replace', 'path': '', 'value': {'id': '1e88305f-e06c-4f9f-8120-20208defbb88', 'streamed_output': [], 'final_output': None, 'logs': {}, 'name': 'LangGraph', 'type': 'chain'}} {'op': 'add', 'path': '/logs/ChatOpenAI', 'value': {'id': '69000165-10d9-4828-962d-653e95bbc436', 'name': 'ChatOpenAI', 'type': 'llm', 'tags': ['seq:step:2'], 'metadata': {}, 'start_time': '2024-04-23T13:54:01.044+00:00', 'streamed_output': [], 'streamed_output_str': [], 'final_output': None, 'end_time': None}} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '', 'name': 'route'}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '{"', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'next', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '":"', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'Mongo', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'DB', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'Agent', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '"}', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='')} {'op': 'add', 'path': '/logs/ChatOpenAI/final_output', 'value': {'generations': [[{'text': '', 'generation_info': {'finish_reason': 'stop'}, 'type': 'ChatGenerationChunk', 'message': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '{"next":"MongoDBAgent"}', 'name': 'route'}}, response_metadata={'finish_reason': 'stop'}, id='run-69000165-10d9-4828-962d-653e95bbc436')}]], 'llm_output': None, 'run': None}} {'op': 'add', 'path': '/logs/ChatOpenAI/end_time', 'value': '2024-04-23T13:54:02.193+00:00'} {'op': 'add', 'path': '/streamed_output/-', 'value': {'Supervisor': {'next': 'MongoMongoDBMongoDBAgent'}}} {'op': 'replace', 'path': '/final_output', 'value': {'Supervisor': {'next': 'MongoMongoDBMongoDBAgent'}}}
Streaming is also not working if I update the agent_supervisor notebook as specified in the documention (set
streamingto True and make the tool callsasync).~The
operator.addin theAgentState'smessagesis making it so that the final output from the supervisor has duplicated tokens, if I got it correctly~. Nope, theoperator.addannotation has nothing to do with it.For example, here's the log from a slightly tweaked version of the notebook. In this example, the supervisor should output
MongoDBAgentbut it's outputtingMongoMongoDBMongoDBAgent, resulting in aKeyError.{'op': 'replace', 'path': '', 'value': {'id': '1e88305f-e06c-4f9f-8120-20208defbb88', 'streamed_output': [], 'final_output': None, 'logs': {}, 'name': 'LangGraph', 'type': 'chain'}} {'op': 'add', 'path': '/logs/ChatOpenAI', 'value': {'id': '69000165-10d9-4828-962d-653e95bbc436', 'name': 'ChatOpenAI', 'type': 'llm', 'tags': ['seq:step:2'], 'metadata': {}, 'start_time': '2024-04-23T13:54:01.044+00:00', 'streamed_output': [], 'streamed_output_str': [], 'final_output': None, 'end_time': None}} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '', 'name': 'route'}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '{"', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'next', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '":"', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'Mongo', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'DB', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': 'Agent', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '"}', 'name': ''}})} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output_str/-', 'value': ''} {'op': 'add', 'path': '/logs/ChatOpenAI/streamed_output/-', 'value': AIMessageChunk(content='')} {'op': 'add', 'path': '/logs/ChatOpenAI/final_output', 'value': {'generations': [[{'text': '', 'generation_info': {'finish_reason': 'stop'}, 'type': 'ChatGenerationChunk', 'message': AIMessageChunk(content='', additional_kwargs={'function_call': {'arguments': '{"next":"MongoDBAgent"}', 'name': 'route'}}, response_metadata={'finish_reason': 'stop'}, id='run-69000165-10d9-4828-962d-653e95bbc436')}]], 'llm_output': None, 'run': None}} {'op': 'add', 'path': '/logs/ChatOpenAI/end_time', 'value': '2024-04-23T13:54:02.193+00:00'} {'op': 'add', 'path': '/streamed_output/-', 'value': {'Supervisor': {'next': 'MongoMongoDBMongoDBAgent'}}} {'op': 'replace', 'path': '/final_output', 'value': {'Supervisor': {'next': 'MongoMongoDBMongoDBAgent'}}}
I found the issue and the solution.
re: https://github.com/langchain-ai/langgraph/issues/78#issuecomment-1994836108
The reason explicit config passing is needed in python versions < 3.11 is that asyncio didn't add context support until 3.11
Closing issue. You must either pass config explicitly when using python <3.11 or upgrade to python 3.11 starting from which we automatically do that on behalf of the user.
The config object contains the callbacks which are necessary for astream events and astream log to get information about the streaming tokens from the llm!