langgraph icon indicating copy to clipboard operation
langgraph copied to clipboard

Receiving multiple inputs on a single node not working in a vanilla state

Open minki-j opened this issue 1 year ago • 5 comments

Checked other resources

  • [X] I added a very descriptive title to this issue.
  • [X] I searched the LangChain documentation with the integrated search.
  • [X] I used the GitHub search to find a similar question and didn't find it.
  • [X] I am sure that this is a bug in LangChain rather than my code.

Example Code

from langgraph.graph import END, StateGraph
from langchain_core.runnables import RunnablePassthrough

def path1(state: dict):
    documents = state["documents"]
    documents.append("path1")

    return {"documents": documents}


def path2(state: dict):
    documents = state["documents"]
    documents.append("path2")

    return {"documents": documents}


def middle_router(state: dict):
    return ["path1", "path2"]


def rendezvous(state: dict):
    print("==>>rendezvous")
    documents = state["documents"]
    print("documents: ", documents)
    return {"documents": documents}
    

graph = StateGraph({"documents": []})

graph.add_node("start", RunnablePassthrough())
graph.add_conditional_edges(
    "start",
    middle_router,
    {
        "path1": "path1",
        "path2": "path2",
    },
    then="rendezvous",
)

graph.add_node("path1", path1)
graph.add_edge("path1", "rendezvous")

graph.add_node("path2", path2)
graph.add_edge("path2", "rendezvous")

graph.add_node("rendezvous", rendezvous)
graph.add_edge("rendezvous", END)

graph.set_entry_point("start")
langgraph_app = graph.compile()

langgraph_app.invoke({"documents": []})

Error Message and Stack Trace (if applicable)

InvalidUpdateError Traceback (most recent call last) Cell In[13], line 26 23 graph.set_entry_point("start") 24 langgraph_app = graph.compile() ---> 26 langgraph_app.invoke({"documents": []})

File ~/anaconda3/envs/survey_buddy/lib/python3.12/site-packages/langgraph/pregel/init.py:1177, in Pregel.invoke(self, input, config, stream_mode, output_keys, input_keys, interrupt_before, interrupt_after, debug, **kwargs) 1175 chunks = [] 1176 print("LangGraph Input: ", input) -> 1177 for chunk in self.stream( 1178 input, 1179 config, 1180 stream_mode=stream_mode, 1181 output_keys=output_keys, 1182 input_keys=input_keys, 1183 interrupt_before=interrupt_before, 1184 interrupt_after=interrupt_after, 1185 debug=debug, 1186 **kwargs, 1187 ): 1188 if stream_mode == "values": 1189 latest = chunk

File ~/anaconda3/envs/survey_buddy/lib/python3.12/site-packages/langgraph/pregel/init.py:786, in Pregel.stream(self, input, config, stream_mode, output_keys, input_keys, interrupt_before, interrupt_after, debug) 781 print_step_writes( 782 step, pending_writes, self.stream_channels_list 783 ) 785 # apply writes to channels --> 786 _apply_writes(checkpoint, channels, pending_writes) 788 if debug: 789 print_step_checkpoint(step, channels, self.stream_channels_list)

File ~/anaconda3/envs/survey_buddy/lib/python3.12/site-packages/langgraph/pregel/init.py:1344, in _apply_writes(checkpoint, channels, pending_writes) 1342 channels[chan].update(vals) 1343 except InvalidUpdateError as e: -> 1344 raise InvalidUpdateError( 1345 f"Invalid update for channel {chan}: {e}" 1346 ) from e 1347 checkpoint["channel_versions"][chan] = max_version + 1 1348 updated_channels.add(chan)

InvalidUpdateError: Invalid update for channel root: LastValue can only receive one value per step.

Description

I'm using a basic dictionary {"document": []} for my state and updating it in parallel on path1 and path2. However, this has led to an error related to LastValue. Using a TypeDict state with the Annotated function, I didn't encounter this issue, which suggests that the problem may lie with the state.

Additionally, could you provide a definition of LastValue? I've attempted to understand it, but the Pregel code was quite complex. I would appreciate any documentation on the Pregel source code. Thank you.

System Info

langchain==0.1.17 langchain-anthropic==0.1.11 langchain-community==0.0.37 langchain-core==0.1.52 langchain-openai==0.1.6 langchain-text-splitters==0.0.1 langchainhub==0.1.15

minki-j avatar May 19 '24 21:05 minki-j

Hi @minki-j - great observation. If StateGraph is initialized with an instance of a dictionary, rather than a **type**, the resultinggraph has no knowledge of the documents field in your dictionary and only can only "reduce" on the __root__ level.

As noted, the same code works with a typeddict. It also works if you annotated how you want to reduce the primary state:

graph = StateGraph(Annotated[dict, lambda x, y: {**x, **y}])

Another issue is that the code above has then specified while also explicitly adding edges; it would be better to only have the then argument

hinthornw avatar May 20 '24 15:05 hinthornw

Gonna work with Nuno to make the errors more helpful here, and plan to deprecate then so there's only one way of doing things

hinthornw avatar May 20 '24 16:05 hinthornw

Thanks @hinthornw. I turned the state into a typeddict with the annoated reduce logic, and the error is gone.

Thanks for pointing out the duplicated edges. I mistakenly add the edges when I was cleaning the code for this post.

Can you consider not deprecating then? I love it because it makes the codes cleaner. I think it's a great option to keep having. What about throwing an error when there is a duplicated edges? If you like this idea, I'd love to work on adding that feature!

example>

from varname import nameof

g = StateGraph(StateType)
g.set_entry_point("entry")

g.add_node("entry", RunnablePassthrough())

# With `then`, I can see the flow at a glance
g.add_conditional_edges(
    "entry",
    conditional_func,
    to_path_map(
        [nameof(gather_user_info),
        nameof(gather_vendor_info),]
    ),
    then="rendezvous",
)

g.add_node(nameof(gather_user_info), gather_user_info)
g.add_node(nameof(gather_vendor_info), gather_vendor_info)

# Without using 'then', I have to add the following edges, which is less clear than 'then' in my opinion.
# g.add_edge(nameof(gather_user_info), "rendezvous")
# g.add_edge(nameof(gather_vendor_info), "rendezvous")

g.add_node("rendezvous", RunnablePassthrough())
g.add_edge("rendezvous", END)
# functions used above
def to_path_map(node_names):
    dict = {}
    for name in node_names:
        dict[name] = name
    return dict

def conditional_func(state: dict[str, Documents]):
    if state["documents"].user_info is None:
        return nameof(gather_user_info)
    else:
        return nameof(gather_vendor_info)

minki-j avatar May 21 '24 14:05 minki-j

could you plz tell about, what does the then argument in add_conditional_edges(....,then=...) I checked the docs but didn't understood it. Let me put it in an example manner, if I have an agent then I will make a conditional edge from the agent to the tool later a regular edge goes back to the agent from the tool. it works in an iterative manner until arrives to a solution. In this were does the then argument does the role.

Jeomon avatar May 26 '24 06:05 Jeomon

I am facing similar problem, Please help @hinthornw @Jeomon

This is my state class State(TypedDict): node : Node keywords: Optional[KeywordExtractor] context_scoring: Optional[ContextScoring] seed_question: Optional[List[SeedQuestion]] reasoning_question: Optional[ReasoningQuestion] conditional_question: Optional[ConditionalQuestion] multi_context_question: Optional[MultiContextQuestion] compressed_question: Optional[CompressQuestion] relevant_concept: Optional[RelevantConcept] doc_store: Any

One of my routing function is def seed_question_router(state: State): if state['seed_question']: return ['reasoning_question_gen','conditional_question_gen'] else: return "get_random_node"

I am updating different state parameter in both nodes, then I am receiving below error Traceback (most recent call last): File "/home/x/anaconda3/envs/rag-env/lib/python3.9/runpy.py", line 197, in _run_module_as_main return _run_code(code, main_globals, None, File "/home/x/anaconda3/envs/rag-env/lib/python3.9/runpy.py", line 87, in _run_code exec(code, run_globals) File "/home/x/Documents/RAG Pipeline/evalution/workflow_nodes.py", line 148, in for event in events: File "/home/x/anaconda3/envs/rag-env/lib/python3.9/site-packages/langgraph/pregel/init.py", line 1218, in stream while loop.tick( File "/home/x/anaconda3/envs/rag-env/lib/python3.9/site-packages/langgraph/pregel/loop.py", line 285, in tick mv_writes = apply_writes( File "/home/x/anaconda3/envs/rag-env/lib/python3.9/site-packages/langgraph/pregel/algo.py", line 223, in apply_writes if channels[chan].update(vals) and get_next_version is not None: File "/home/x/anaconda3/envs/rag-env/lib/python3.9/site-packages/langgraph/channels/last_value.py", line 38, in update raise InvalidUpdateError( langgraph.errors.InvalidUpdateError: At key 'node': Can receive only one value per step. Use an Annotated key to handle multiple values.

graph_builder = StateGraph(State)

graph_builder.add_node('get_random_node',get_random_node) graph_builder.add_node('keyword_extractor',keyword_extractor) graph_builder.add_node('context_scoring_node',context_scoring_node) graph_builder.add_node('seed_question_gen',seed_question_gen) graph_builder.add_node('reasoning_question_gen',reasoning_question_gen) graph_builder.add_node('conditional_question_gen',conditional_question_gen)

graph_builder.add_edge(START, "get_random_node") graph_builder.add_edge("get_random_node", "keyword_extractor") graph_builder.add_edge('keyword_extractor','context_scoring_node')

graph_builder.add_edge('seed_question_gen','seed_subgraph')

graph_builder.add_conditional_edges('context_scoring_node',context_filter) graph_builder.add_conditional_edges('seed_question_gen',seed_question_router) graph_builder.add_conditional_edges('reasoning_question_gen',reasoning_question_router) graph_builder.add_conditional_edges('conditional_question_gen',conditional_question_router)

What should I change?

Eknathabhiram avatar Sep 21 '24 08:09 Eknathabhiram

This is because somewhere in the graph that you have created the Node property in the state of the graph, it is updated simultaneously by two or more nodes in the graph.

Jeomon avatar Nov 22 '24 11:11 Jeomon