langgraph icon indicating copy to clipboard operation
langgraph copied to clipboard

astream_log produces TypeError: unsupported operand type(s) for +: 'dict' and 'dict' in passthrough.py

Open sploithunter opened this issue 1 year ago • 3 comments

Checked other resources

  • [X] I added a very descriptive title to this issue.
  • [X] I searched the LangChain documentation with the integrated search.
  • [X] I used the GitHub search to find a similar question and didn't find it.
  • [X] I am sure that this is a bug in LangChain rather than my code.

Example Code

The following code produces the error. I have found it in many different scenarios, but this uses one of your base examples from https://github.com/langchain-ai/langgraph/blob/main/examples/multi_agent/agent_supervisor.ipynb. The only change is the async invocation to produce the aysnc for output in graph.astream_log(): located at the very bottom of the code.

import getpass
import os

from langchain_community.chat_models import ChatOpenAI
# Optional, add tracing in LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "Multi-agent Collaboration"

from typing import Annotated, List, Tuple, Union


from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.tools import tool
from langchain_experimental.tools import PythonREPLTool

tavily_tool = TavilySearchResults(max_results=5)



# This executes code locally, which can be unsafe
python_repl_tool = PythonREPLTool()

from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_openai import ChatOpenAI


def create_agent(llm: ChatOpenAI, tools: list, system_prompt: str):
    # Each worker node will be given a name and some tools.
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                system_prompt,
            ),
            MessagesPlaceholder(variable_name="messages"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ]
    )
    agent = create_openai_tools_agent(llm, tools, prompt)
    executor = AgentExecutor(agent=agent, tools=tools)
    return executor

def agent_node(state, agent, name):
    result = agent.invoke(state)
    return {"messages": [HumanMessage(content=result["output"], name=name)]}

from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

members = ["Researcher", "Coder"]
system_prompt = (
    "You are a supervisor tasked with managing a conversation between the"
    " following workers:  {members}. Given the following user request,"
    " respond with the worker to act next. Each worker will perform a"
    " task and respond with their results and status. When finished,"
    " respond with FINISH."
)
# Our team supervisor is an LLM node. It just picks the next agent to process
# and decides when the work is completed
options = ["FINISH"] + members
# Using openai function calling can make output parsing easier for us
function_def = {
    "name": "route",
    "description": "Select the next role.",
    "parameters": {
        "title": "routeSchema",
        "type": "object",
        "properties": {
            "next": {
                "title": "Next",
                "anyOf": [
                    {"enum": options},
                ],
            }
        },
        "required": ["next"],
    },
}
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        MessagesPlaceholder(variable_name="messages"),
        (
            "system",
            "Given the conversation above, who should act next?"
            " Or should we FINISH? Select one of: {options}",
        ),
    ]
).partial(options=str(options), members=", ".join(members))

llm = ChatOpenAI(model="gpt-4-1106-preview", streaming=True)

supervisor_chain = (
    prompt
    | llm.bind_functions(functions=[function_def], function_call="route")
    | JsonOutputFunctionsParser()
)

import operator
from typing import Annotated, Any, Dict, List, Optional, Sequence, TypedDict
import functools

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import StateGraph, END


# The agent state is the input to each node in the graph
class AgentState(TypedDict):
    # The annotation tells the graph that new messages will always
    # be added to the current states
    messages: Annotated[Sequence[BaseMessage], operator.add]
    # The 'next' field indicates where to route to next
    next: str


research_agent = create_agent(llm, [tavily_tool], "You are a web researcher.")
research_node = functools.partial(agent_node, agent=research_agent, name="Researcher")

# NOTE: THIS PERFORMS ARBITRARY CODE EXECUTION. PROCEED WITH CAUTION
code_agent = create_agent(
    llm,
    [python_repl_tool],
    "You may generate safe python code to analyze data and generate charts using matplotlib.",
)
code_node = functools.partial(agent_node, agent=code_agent, name="Coder")

workflow = StateGraph(AgentState)
workflow.add_node("Researcher", research_node)
workflow.add_node("Coder", code_node)
workflow.add_node("supervisor", supervisor_chain)

for member in members:
    # We want our workers to ALWAYS "report back" to the supervisor when done
    workflow.add_edge(member, "supervisor")
# The supervisor populates the "next" field in the graph state
# which routes to a node or finishes
conditional_map = {k: k for k in members}
conditional_map["FINISH"] = END
workflow.add_conditional_edges("supervisor", lambda x: x["next"], conditional_map)
# Finally, add entrypoint
workflow.set_entry_point("supervisor")

graph = workflow.compile()

async def main():
   async for output in graph.astream_log(
        {
            "messages": [
                HumanMessage(content="Code hello world and print it to the terminal")
            ]
        }, include_types=["llm"]
    ):
        for op in output.ops:
            if op["path"] == "/streamed_output/-":
                # this is the output from .stream()
                ...
            elif op["path"].startswith("/logs/") and op["path"].endswith(
                "/streamed_output/-"
            ):
                # because we chose to only include LLMs, these are LLM tokens
                print(op["value"])
if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Error Message and Stack Trace (if applicable)

(agents_v09) JasonMacPro:agents_v09 jason$ python langgraph_astream_events.py content='' additional_kwargs={'function_call': {'arguments': '', 'name': 'route'}} content='' additional_kwargs={'function_call': {'arguments': '{"', 'name': ''}} content='' additional_kwargs={'function_call': {'arguments': 'next', 'name': ''}} content='' additional_kwargs={'function_call': {'arguments': '":"', 'name': ''}} Traceback (most recent call last): File "/Users/jason/Documents/agents_v09/langgraph_astream_events.py", line 165, in asyncio.run(main()) File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete return future.result() File "/Users/jason/Documents/agents_v09/langgraph_astream_events.py", line 147, in main async for output in graph.astream_log( File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/runnables/base.py", line 683, in astream_log async for item in _astream_log_implementation( # type: ignore File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/tracers/log_stream.py", line 612, in _astream_log_implementation await task File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/tracers/log_stream.py", line 566, in consume_astream async for chunk in runnable.astream(input, config, **kwargs): File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langgraph/pregel/init.py", line 657, in astream async for chunk in self.atransform( File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langgraph/pregel/init.py", line 675, in atransform async for chunk in self._atransform_stream_with_config( File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/runnables/base.py", line 1597, in _atransform_stream_with_config chunk = cast(Output, await py_anext(iterator)) File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/tracers/log_stream.py", line 237, in tap_output_aiter async for chunk in output: File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langgraph/pregel/init.py", line 524, in _atransform _interrupt_or_proceed(done, inflight, step) File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langgraph/pregel/init.py", line 698, in _interrupt_or_proceed raise exc File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langgraph/pregel/init.py", line 836, in _aconsume async for _ in iterator: File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/runnables/base.py", line 4140, in astream async for item in self.bound.astream( File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/runnables/base.py", line 2452, in astream async for chunk in self.atransform(input_aiter(), config, **kwargs): File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/runnables/base.py", line 2435, in atransform async for chunk in self._atransform_stream_with_config( File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/runnables/base.py", line 1597, in _atransform_stream_with_config chunk = cast(Output, await py_anext(iterator)) File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/tracers/log_stream.py", line 237, in tap_output_aiter async for chunk in output: File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/runnables/base.py", line 2405, in _atransform async for output in final_pipeline: File "/Users/jason/.pyenv/versions/3.10.11/lib/python3.10/site-packages/langchain_core/runnables/passthrough.py", line 280, in atransform final = final + chunk TypeError: unsupported operand type(s) for +: 'dict' and 'dict'

Description

I am trying to stream output from a compiled langgraph using the astream_log (astream_events also produces this error). It is easily reproducible with example code in many of the langgraph examples if using astream_log rather than astream or synchronous calls.

System Info

langchain==0.1.8 langchain-community==0.0.21 langchain-core==0.1.25 langchain-experimental==0.0.52 langchain-mistralai==0.0.4 langchain-openai==0.0.6 langgraph==0.0.25 langsmith==0.1.5

Mac OSX 12.6.5

Python 3.10.11

sploithunter avatar Feb 22 '24 16:02 sploithunter

also mentioned in #124

russell-dot-js avatar Feb 24 '24 08:02 russell-dot-js

any update on this one?

svaditya avatar Mar 01 '24 20:03 svaditya

I had the same issue and used some suggestions in #78 . after upgrading to python 3.12 and downgrading to langgraph 0.0.20 I could run all the example notebooks including this one (agent supervisor).

However my graph needs sqlite memory from langgraph>=0.0.22 so I'm a bit stuck. Now I'm back on langgraph=0.0.26 and using astream_events, I got these messages:

LangChainBetaWarning: This API is in beta and may change in the future.
  warn_beta(
NotImplementedError in LogStreamCallbackHandler.on_chain_end callback: NotImplementedError("Trying to load an object that doesn't implement serialization: {'lc': 1, 'type': 'not_implemented', 'id': ['builtins', 'object'], 'repr': '<object object at 0x00000204E3FA63D0>'}")`
---------------------------------------------------------------------------

    [283](file:///C:/ProgramData/Anaconda3/envs/llm/Lib/site-packages/langchain_core/runnables/passthrough.py:283)     config = ensure_config(config)

TypeError: unsupported operand type(s) for +: 'dict' and 'dict'

Would love to hear if anyone has a solution to this issue yet

mingxuan-he avatar Mar 04 '24 10:03 mingxuan-he

Will investigate tomorrow if it's langchain-core or langgraph. Similar error message appears in langserve

eyurtsev avatar Mar 07 '24 01:03 eyurtsev

cross linking https://github.com/langchain-ai/langserve/issues/504

eyurtsev avatar Mar 07 '24 01:03 eyurtsev

Thank you for the fix @eyurtsev ! I upgraded to langchain_core 0.1.31 but still got the same error. Looks like the error message is pointing to langchain_core/runnables/passthrough.py instead of base.py. Do you mind looking into it and see if the same solution _adapt_first_streaming_chunk can be applied for transform and atransform there?

mingxuan-he avatar Mar 14 '24 00:03 mingxuan-he

@mingxuan-he thanks for flagging -- This will be available during the next release.

eyurtsev avatar Mar 14 '24 21:03 eyurtsev

when do you expect the next release to be released?

niros1 avatar Mar 16 '24 13:03 niros1

This has been released in latest version of langchain-core

nfcampos avatar Mar 30 '24 00:03 nfcampos