langgraph
langgraph copied to clipboard
Parallel Tool Calling and LLM Token Streaming Issue
Is there a reason you've been using legacy function calling for OpenAI models in all the langgraph examples instead of tools? Before the latest update with ToolExecutor I did something sort-of similar by subclassing Runnable, overwriting invoke then calling batch on the tool inputs for concurrent execution with a tools agent and that worked fine, but I'm just curious why you aren't promoting the use of tools vs. functions with langgraph like you are for Agents, and adding support for tool_call_ids, etc. With an openai_tools_agent and the new classes something like this gives the correct output:
def execute_tools_concurrent(data):
agent_actions = data.pop('agent_outcome')
tool_actions = [ToolInvocation(tool = action.tool, tool_input=action.tool_input) for action in agent_actions]
outputs = tool_executor.batch(tool_actions)
full_output = list(zip(agent_actions, outputs))
data['intermediate_steps'].extend(full_output)
return data
But for building from scratch in langgraph and appending ToolMessages instead of FunctionMessages to the state, the new classes do not support adding tool_call_ids, so you have to extend those classes. I'm currently adding an attribute to ToolInvocationInterface and ToolInvocation for tool_call_id and extending ToolExecutor to output the following from _execute and _aexecute:
return {"id": tool_invocation.tool_call_id, "output": output}
Then you can do this sort of thing:
...
tool_calls = last_message.additional_kwargs['tool_calls']
actions = [ToolInvocationWithID(
tool=tool_call['function']['name'],
tool_input=json.loads(tool_call['function']['arguments']),
tool_call_id = tool_call['id']
) for tool_call in tool_calls]
responses = enhanced_executor.batch(actions)
tool_messages = [ToolMessage(content=str(r['output']), tool_call_id=r['id']) for r in responses]
return {"messages": tool_messages}
This does work, but maybe there's a better way? Am I missing something?
My actual issue is I can't get the LLM token streaming to work. I updated langchain, langchain_openai, and langgraph and copied streaming-tokens.ipynb verbatim and I'm getting no /logs/ output at all from astream_log. Are you sure the implementation with:
async def call_model(state):
messages = state['messages']
response = await model.ainvoke(messages)
return {"messages": [response]}
is correct? Do you need to change it to astream or something to get the tokens?
@tgram-3D for now to get token streaming to work:
- initialize your LLM with
streaming=True
- make sure everything is async
- If you're using a callback manager, use the async variety to be safe and propagate it through or else attach it directly as a local callback to the model
- Alternatively, use the output astream_log to get token by token streaming
@tgram-3D take a look at:
https://github.com/langchain-ai/langchain/blob/d8c833eff523335beef9791233fc12a8c7235e95/docs/docs/modules/agents/how_to/streaming.ipynb
https://github.com/langchain-ai/langchain/blob/d8c833eff523335beef9791233fc12a8c7235e95/docs/docs/modules/agents/how_to/streaming_callbacks.ipynb
and we have a new solution incoming:
https://github.com/langchain-ai/langchain/pull/16172
@eyurtsev Thank you for the info.
I found out that token streaming from astream_log with the provided configuration works if you are using an Agent as the agent node in LangGraph, but not if you are using ChatOpenAI directly with a wrapper call_model function like in the new examples. I'm sure there's a way around it with that same syntax, but an interim fix until astream_events is ready is to compose it as a chain, e.g.:
model_chain = (lambda x: x['messages']) | model | {"messages": lambda x: [x]}
...
workflow.add_node("agent", model_chain)
This way astream_log recognizes the nested streaming llm call and you can access the individual tokens with:
async for output in app.astream_log(inputs, include_types="llm"):
for op in output.ops:
if op["path"] == "/streamed_output/-":
...
elif op["path"].startswith("/logs/") and op["path"].endswith(
"/streamed_output/-"
):
print(op["value"])
I think you'd need to yield
the token outputs in the call_model function so that it is treated as a generator. @eyurtsev anything else required to get the streaming to work?
@eyurtsev I really appreciate your hard work; it looks like your changes outlined in https://github.com/langchain-ai/langchain/pull/16172 made it into the main branch, and its working perfectly to propagate on the on_chat_model_stream
events, however the on_tool_start
and on_tool_end
events included in the recent examples don't seem to be supported.
@hinthornw I'm confident the issue lies with the missing yield. Specifically, in the streaming notebook example, the call_model
method merely invokes the results from the LLM chat chain, whereas it ought to yield the results instead. However, when I modified it to yield the results, I found myself unable to retrieve the streamed tokens via the app.stream_logs
or apps.stream_events
method.
any guidance from your side @elliottshort ?
Any updates for the parallel tool calling? @tgram-3D can you share your code or open a PR?
@gianfrancodemarco A few of the latest notebooks use OpenAI tools and PydanticToolsParser, but enforce calling a specific tool vs. parallel calling - I think. I'm sure you could use those examples for a much cleaner way to do parallel tool calling, but originally I just extended the ToolExecutor classes to enable appending ToolMessages to the graph:
from langgraph.prebuilt.tool_executor import ToolInvocationInterface, ToolExecutor, ToolInvocation
from typing import Any
class ToolInvocationWithIDInterface(ToolInvocationInterface):
"""Extended interface for invoking a tool with an ID."""
tool_call_id: str
class ToolInvocationWithID(ToolInvocation):
"""Information about how to invoke a tool."""
tool_call_id: str
class EnhancedToolExecutor(ToolExecutor):
async def _aexecute(self, tool_invocation: ToolInvocationWithIDInterface) -> Any:
if tool_invocation.tool not in self.tool_map:
return {
"id": tool_invocation.tool_call_id,
"error": self.invalid_tool_msg_template.format(
requested_tool_name=tool_invocation.tool,
available_tool_names_str=", ".join([t.name for t in self.tools]),
)
}
else:
tool = self.tool_map[tool_invocation.tool]
output = await tool.ainvoke(tool_invocation.tool_input)
return {"id": tool_invocation.tool_call_id, "output": output}
def _execute(self, tool_invocation: ToolInvocationWithIDInterface) -> Any:
if tool_invocation.tool not in self.tool_map:
return {
"id": tool_invocation.tool_call_id,
"error": self.invalid_tool_msg_template.format(
requested_tool_name=tool_invocation.tool,
available_tool_names_str=", ".join([t.name for t in self.tools]),
)
}
else:
tool = self.tool_map[tool_invocation.tool]
output = tool.invoke(tool_invocation.tool_input)
return {"id": tool_invocation.tool_call_id, "output": output}
Then you can do parallel calling with batch, abatch, etc. as described in my first comment.
I got the same problem (with llm not streaming tokens) and have fixed it with the following workaround:
async def call_model(state, config): # <- I have added extra parameter "config"
messages = state['messages']
response = await model.ainvoke(messages, config=config) # <- and passing it to model
return {"messages": [response]}
That way I was able to get my tokens stream from llm
What should be passed as config?
Hi, @delorenzotommaso
You are not supposed to call call_model
yourself.
This function will be called by langgraph and it will create and pass config itself.
@dmitryrPlanner5D I see. I have a slightly different configuration, but your solution works in my case as well!
My node calls a chain. When calling chain.astream_events
the tokens are streamed correctly. However, when called within the graph, the same wasn't happening. Adding config
solved the problem.
Here is the definition of the node I have
async def generate(
state: GraphState,
config
):
agent_output = state["agent_outcome"]
inputs = agent_output.tool_input
docs = state["docs"]
output = await chain.ainvoke(
{
"input": inputs["standalone_query"],
"conversation_summary": inputs["conversation_summary"],
"context": _combine_documents(docs),
},
config = config
)
return {"output": output}
The chain is a standard prompt | llm | JsonOutputParser
with streaming=True
for the llm and llm is bind to a pydantic tool to give cited answers.
@dmitryrPlanner5D Is there a similar trick to allow streaming of the JsonOutputToolParser
?
When running chain.astream_events
I see events appearing like
on_chain_start
on_prompt_start
on_prompt_end
on_chat_model_start
on_chat_model_stream
on_parser_start
on_parser_stream
on_chain_stream
on_chat_model_stream
on_parser_stream
when instead running graph.astream_events
, the events relative to the node running the above chain looks like
on_chain_start
on_prompt_start
on_prompt_end
on_chat_model_start
on_chat_model_stream
on_chat_model_stream
on_chat_model_stream
...
on_chat_model_stream
on_chat_model_end
on_parser_start
on_parser_end
on_chain_end
Any idea on why the parser does not stream?
@delorenzotommaso Sorry, no, I have no experience with streaming parsers, usually the stream from llm is enough for my usecases.
But anyway, when ainvoke chain with config inside your node, I would expect it to work the same way as ainvoke chain directly, so it is strange that events are different 🤷
Opened a discussion about it https://github.com/langchain-ai/langchain/discussions/20324