langgraph icon indicating copy to clipboard operation
langgraph copied to clipboard

Parallel Tool Calling and LLM Token Streaming Issue

Open tgram-3D opened this issue 1 year ago • 15 comments

Is there a reason you've been using legacy function calling for OpenAI models in all the langgraph examples instead of tools? Before the latest update with ToolExecutor I did something sort-of similar by subclassing Runnable, overwriting invoke then calling batch on the tool inputs for concurrent execution with a tools agent and that worked fine, but I'm just curious why you aren't promoting the use of tools vs. functions with langgraph like you are for Agents, and adding support for tool_call_ids, etc. With an openai_tools_agent and the new classes something like this gives the correct output:

def execute_tools_concurrent(data):
    agent_actions = data.pop('agent_outcome')
    tool_actions = [ToolInvocation(tool = action.tool, tool_input=action.tool_input) for action in agent_actions]
    outputs = tool_executor.batch(tool_actions)
    full_output = list(zip(agent_actions, outputs))
    data['intermediate_steps'].extend(full_output)
    return data 

But for building from scratch in langgraph and appending ToolMessages instead of FunctionMessages to the state, the new classes do not support adding tool_call_ids, so you have to extend those classes. I'm currently adding an attribute to ToolInvocationInterface and ToolInvocation for tool_call_id and extending ToolExecutor to output the following from _execute and _aexecute:

return {"id": tool_invocation.tool_call_id, "output": output}

Then you can do this sort of thing:

...
tool_calls = last_message.additional_kwargs['tool_calls']

actions = [ToolInvocationWithID(
        tool=tool_call['function']['name'],
        tool_input=json.loads(tool_call['function']['arguments']), 
        tool_call_id = tool_call['id']
    ) for tool_call in tool_calls]

responses = enhanced_executor.batch(actions)
   
tool_messages = [ToolMessage(content=str(r['output']), tool_call_id=r['id']) for r in responses]

return {"messages": tool_messages}

This does work, but maybe there's a better way? Am I missing something?


My actual issue is I can't get the LLM token streaming to work. I updated langchain, langchain_openai, and langgraph and copied streaming-tokens.ipynb verbatim and I'm getting no /logs/ output at all from astream_log. Are you sure the implementation with:

async def call_model(state):
    messages = state['messages']
    response = await model.ainvoke(messages)
  
    return {"messages": [response]}

is correct? Do you need to change it to astream or something to get the tokens?

tgram-3D avatar Jan 18 '24 20:01 tgram-3D

@tgram-3D for now to get token streaming to work:

  1. initialize your LLM with streaming=True
  2. make sure everything is async
  • If you're using a callback manager, use the async variety to be safe and propagate it through or else attach it directly as a local callback to the model
  • Alternatively, use the output astream_log to get token by token streaming

eyurtsev avatar Jan 18 '24 22:01 eyurtsev

@tgram-3D take a look at:

https://github.com/langchain-ai/langchain/blob/d8c833eff523335beef9791233fc12a8c7235e95/docs/docs/modules/agents/how_to/streaming.ipynb

https://github.com/langchain-ai/langchain/blob/d8c833eff523335beef9791233fc12a8c7235e95/docs/docs/modules/agents/how_to/streaming_callbacks.ipynb

and we have a new solution incoming:

https://github.com/langchain-ai/langchain/pull/16172

eyurtsev avatar Jan 18 '24 22:01 eyurtsev

@eyurtsev Thank you for the info.

I found out that token streaming from astream_log with the provided configuration works if you are using an Agent as the agent node in LangGraph, but not if you are using ChatOpenAI directly with a wrapper call_model function like in the new examples. I'm sure there's a way around it with that same syntax, but an interim fix until astream_events is ready is to compose it as a chain, e.g.:

model_chain = (lambda x: x['messages']) | model | {"messages": lambda x: [x]}

...

workflow.add_node("agent", model_chain)

This way astream_log recognizes the nested streaming llm call and you can access the individual tokens with:

async for output in app.astream_log(inputs, include_types="llm"):
    for op in output.ops:
        if op["path"] == "/streamed_output/-":
            ...
        elif op["path"].startswith("/logs/") and op["path"].endswith(
            "/streamed_output/-"
        ):
            print(op["value"])

tgram-3D avatar Jan 19 '24 18:01 tgram-3D

I think you'd need to yield the token outputs in the call_model function so that it is treated as a generator. @eyurtsev anything else required to get the streaming to work?

hinthornw avatar Feb 01 '24 17:02 hinthornw

@eyurtsev I really appreciate your hard work; it looks like your changes outlined in https://github.com/langchain-ai/langchain/pull/16172 made it into the main branch, and its working perfectly to propagate on the on_chat_model_stream events, however the on_tool_start and on_tool_end events included in the recent examples don't seem to be supported.

elliottshort avatar Feb 01 '24 19:02 elliottshort

@hinthornw I'm confident the issue lies with the missing yield. Specifically, in the streaming notebook example, the call_model method merely invokes the results from the LLM chat chain, whereas it ought to yield the results instead. However, when I modified it to yield the results, I found myself unable to retrieve the streamed tokens via the app.stream_logs or apps.stream_events method.

any guidance from your side @elliottshort ?

ahmedmoorsy avatar Feb 02 '24 23:02 ahmedmoorsy

Any updates for the parallel tool calling? @tgram-3D can you share your code or open a PR?

gianfrancodemarco avatar Feb 13 '24 00:02 gianfrancodemarco

@gianfrancodemarco A few of the latest notebooks use OpenAI tools and PydanticToolsParser, but enforce calling a specific tool vs. parallel calling - I think. I'm sure you could use those examples for a much cleaner way to do parallel tool calling, but originally I just extended the ToolExecutor classes to enable appending ToolMessages to the graph:

from langgraph.prebuilt.tool_executor import ToolInvocationInterface, ToolExecutor, ToolInvocation
from typing import Any

class ToolInvocationWithIDInterface(ToolInvocationInterface):
    """Extended interface for invoking a tool with an ID."""
    tool_call_id: str

class ToolInvocationWithID(ToolInvocation):
    """Information about how to invoke a tool."""
    tool_call_id: str

class EnhancedToolExecutor(ToolExecutor):
    async def _aexecute(self, tool_invocation: ToolInvocationWithIDInterface) -> Any:
        if tool_invocation.tool not in self.tool_map:
            return {
                "id": tool_invocation.tool_call_id,
                "error": self.invalid_tool_msg_template.format(
                    requested_tool_name=tool_invocation.tool,
                    available_tool_names_str=", ".join([t.name for t in self.tools]),
                )
            }
        else:
            tool = self.tool_map[tool_invocation.tool]
            output = await tool.ainvoke(tool_invocation.tool_input)
            return {"id": tool_invocation.tool_call_id, "output": output}

    def _execute(self, tool_invocation: ToolInvocationWithIDInterface) -> Any:
        if tool_invocation.tool not in self.tool_map:
            return {
                "id": tool_invocation.tool_call_id,
                "error": self.invalid_tool_msg_template.format(
                    requested_tool_name=tool_invocation.tool,
                    available_tool_names_str=", ".join([t.name for t in self.tools]),
                )
            }
        else:
            tool = self.tool_map[tool_invocation.tool]
            output = tool.invoke(tool_invocation.tool_input)
            return {"id": tool_invocation.tool_call_id, "output": output}

Then you can do parallel calling with batch, abatch, etc. as described in my first comment.

tgram-3D avatar Feb 13 '24 21:02 tgram-3D

I got the same problem (with llm not streaming tokens) and have fixed it with the following workaround:

async def call_model(state, config):  # <- I have added extra parameter "config"
    messages = state['messages']
    response = await model.ainvoke(messages, config=config)  # <- and passing it to model
  
    return {"messages": [response]}

That way I was able to get my tokens stream from llm

dmitryrPlanner5D avatar Mar 13 '24 16:03 dmitryrPlanner5D

What should be passed as config?

delorenzotommaso avatar Apr 10 '24 10:04 delorenzotommaso

Hi, @delorenzotommaso You are not supposed to call call_model yourself. This function will be called by langgraph and it will create and pass config itself.

dmitryrPlanner5D avatar Apr 10 '24 10:04 dmitryrPlanner5D

@dmitryrPlanner5D I see. I have a slightly different configuration, but your solution works in my case as well! My node calls a chain. When calling chain.astream_events the tokens are streamed correctly. However, when called within the graph, the same wasn't happening. Adding config solved the problem. Here is the definition of the node I have

async def generate(
        state: GraphState,
        config
    ):

    agent_output = state["agent_outcome"]    
    inputs = agent_output.tool_input
    docs = state["docs"]

    output = await chain.ainvoke(
        {
            "input": inputs["standalone_query"],
            "conversation_summary": inputs["conversation_summary"],
            "context": _combine_documents(docs),
        },
        config = config
    )

    return {"output": output}

The chain is a standard prompt | llm | JsonOutputParser with streaming=True for the llm and llm is bind to a pydantic tool to give cited answers.

delorenzotommaso avatar Apr 10 '24 10:04 delorenzotommaso

@dmitryrPlanner5D Is there a similar trick to allow streaming of the JsonOutputToolParser? When running chain.astream_events I see events appearing like

on_chain_start
on_prompt_start
on_prompt_end
on_chat_model_start
on_chat_model_stream
on_parser_start
on_parser_stream
on_chain_stream
on_chat_model_stream
on_parser_stream

when instead running graph.astream_events, the events relative to the node running the above chain looks like

on_chain_start
on_prompt_start
on_prompt_end
on_chat_model_start
on_chat_model_stream
on_chat_model_stream
on_chat_model_stream
...
on_chat_model_stream
on_chat_model_end
on_parser_start
on_parser_end
on_chain_end

Any idea on why the parser does not stream?

delorenzotommaso avatar Apr 10 '24 22:04 delorenzotommaso

@delorenzotommaso Sorry, no, I have no experience with streaming parsers, usually the stream from llm is enough for my usecases.

But anyway, when ainvoke chain with config inside your node, I would expect it to work the same way as ainvoke chain directly, so it is strange that events are different 🤷

dmitryrPlanner5D avatar Apr 11 '24 07:04 dmitryrPlanner5D

Opened a discussion about it https://github.com/langchain-ai/langchain/discussions/20324

tommasodelorenzo avatar Apr 11 '24 14:04 tommasodelorenzo