Receiving multiple inputs on a single node not working in a vanilla state
Checked other resources
- [X] I added a very descriptive title to this issue.
- [X] I searched the LangChain documentation with the integrated search.
- [X] I used the GitHub search to find a similar question and didn't find it.
- [X] I am sure that this is a bug in LangChain rather than my code.
Example Code
from langgraph.graph import END, StateGraph
from langchain_core.runnables import RunnablePassthrough
def path1(state: dict):
documents = state["documents"]
documents.append("path1")
return {"documents": documents}
def path2(state: dict):
documents = state["documents"]
documents.append("path2")
return {"documents": documents}
def middle_router(state: dict):
return ["path1", "path2"]
def rendezvous(state: dict):
print("==>>rendezvous")
documents = state["documents"]
print("documents: ", documents)
return {"documents": documents}
graph = StateGraph({"documents": []})
graph.add_node("start", RunnablePassthrough())
graph.add_conditional_edges(
"start",
middle_router,
{
"path1": "path1",
"path2": "path2",
},
then="rendezvous",
)
graph.add_node("path1", path1)
graph.add_edge("path1", "rendezvous")
graph.add_node("path2", path2)
graph.add_edge("path2", "rendezvous")
graph.add_node("rendezvous", rendezvous)
graph.add_edge("rendezvous", END)
graph.set_entry_point("start")
langgraph_app = graph.compile()
langgraph_app.invoke({"documents": []})
Error Message and Stack Trace (if applicable)
InvalidUpdateError Traceback (most recent call last) Cell In[13], line 26 23 graph.set_entry_point("start") 24 langgraph_app = graph.compile() ---> 26 langgraph_app.invoke({"documents": []})
File ~/anaconda3/envs/survey_buddy/lib/python3.12/site-packages/langgraph/pregel/init.py:1177, in Pregel.invoke(self, input, config, stream_mode, output_keys, input_keys, interrupt_before, interrupt_after, debug, **kwargs) 1175 chunks = [] 1176 print("LangGraph Input: ", input) -> 1177 for chunk in self.stream( 1178 input, 1179 config, 1180 stream_mode=stream_mode, 1181 output_keys=output_keys, 1182 input_keys=input_keys, 1183 interrupt_before=interrupt_before, 1184 interrupt_after=interrupt_after, 1185 debug=debug, 1186 **kwargs, 1187 ): 1188 if stream_mode == "values": 1189 latest = chunk
File ~/anaconda3/envs/survey_buddy/lib/python3.12/site-packages/langgraph/pregel/init.py:786, in Pregel.stream(self, input, config, stream_mode, output_keys, input_keys, interrupt_before, interrupt_after, debug) 781 print_step_writes( 782 step, pending_writes, self.stream_channels_list 783 ) 785 # apply writes to channels --> 786 _apply_writes(checkpoint, channels, pending_writes) 788 if debug: 789 print_step_checkpoint(step, channels, self.stream_channels_list)
File ~/anaconda3/envs/survey_buddy/lib/python3.12/site-packages/langgraph/pregel/init.py:1344, in _apply_writes(checkpoint, channels, pending_writes) 1342 channels[chan].update(vals) 1343 except InvalidUpdateError as e: -> 1344 raise InvalidUpdateError( 1345 f"Invalid update for channel {chan}: {e}" 1346 ) from e 1347 checkpoint["channel_versions"][chan] = max_version + 1 1348 updated_channels.add(chan)
InvalidUpdateError: Invalid update for channel root: LastValue can only receive one value per step.
Description
I'm using a basic dictionary {"document": []} for my state and updating it in parallel on path1 and path2. However, this has led to an error related to LastValue. Using a TypeDict state with the Annotated function, I didn't encounter this issue, which suggests that the problem may lie with the state.
Additionally, could you provide a definition of LastValue? I've attempted to understand it, but the Pregel code was quite complex. I would appreciate any documentation on the Pregel source code. Thank you.
System Info
langchain==0.1.17 langchain-anthropic==0.1.11 langchain-community==0.0.37 langchain-core==0.1.52 langchain-openai==0.1.6 langchain-text-splitters==0.0.1 langchainhub==0.1.15
Hi @minki-j - great observation. If StateGraph is initialized with an instance of a dictionary, rather than a **type**, the resultinggraph has no knowledge of the documents field in your dictionary and only can only "reduce" on the __root__ level.
As noted, the same code works with a typeddict. It also works if you annotated how you want to reduce the primary state:
graph = StateGraph(Annotated[dict, lambda x, y: {**x, **y}])
Another issue is that the code above has then specified while also explicitly adding edges; it would be better to only have the then argument
Gonna work with Nuno to make the errors more helpful here, and plan to deprecate then so there's only one way of doing things
Thanks @hinthornw. I turned the state into a typeddict with the annoated reduce logic, and the error is gone.
Thanks for pointing out the duplicated edges. I mistakenly add the edges when I was cleaning the code for this post.
Can you consider not deprecating then? I love it because it makes the codes cleaner. I think it's a great option to keep having. What about throwing an error when there is a duplicated edges? If you like this idea, I'd love to work on adding that feature!
example>
from varname import nameof
g = StateGraph(StateType)
g.set_entry_point("entry")
g.add_node("entry", RunnablePassthrough())
# With `then`, I can see the flow at a glance
g.add_conditional_edges(
"entry",
conditional_func,
to_path_map(
[nameof(gather_user_info),
nameof(gather_vendor_info),]
),
then="rendezvous",
)
g.add_node(nameof(gather_user_info), gather_user_info)
g.add_node(nameof(gather_vendor_info), gather_vendor_info)
# Without using 'then', I have to add the following edges, which is less clear than 'then' in my opinion.
# g.add_edge(nameof(gather_user_info), "rendezvous")
# g.add_edge(nameof(gather_vendor_info), "rendezvous")
g.add_node("rendezvous", RunnablePassthrough())
g.add_edge("rendezvous", END)
# functions used above
def to_path_map(node_names):
dict = {}
for name in node_names:
dict[name] = name
return dict
def conditional_func(state: dict[str, Documents]):
if state["documents"].user_info is None:
return nameof(gather_user_info)
else:
return nameof(gather_vendor_info)
could you plz tell about, what does the then argument in add_conditional_edges(....,then=...) I checked the docs but didn't understood it. Let me put it in an example manner, if I have an agent then I will make a conditional edge from the agent to the tool later a regular edge goes back to the agent from the tool. it works in an iterative manner until arrives to a solution. In this were does the then argument does the role.
I am facing similar problem, Please help @hinthornw @Jeomon
This is my state class State(TypedDict): node : Node keywords: Optional[KeywordExtractor] context_scoring: Optional[ContextScoring] seed_question: Optional[List[SeedQuestion]] reasoning_question: Optional[ReasoningQuestion] conditional_question: Optional[ConditionalQuestion] multi_context_question: Optional[MultiContextQuestion] compressed_question: Optional[CompressQuestion] relevant_concept: Optional[RelevantConcept] doc_store: Any
One of my routing function is def seed_question_router(state: State): if state['seed_question']: return ['reasoning_question_gen','conditional_question_gen'] else: return "get_random_node"
I am updating different state parameter in both nodes, then I am receiving below error
Traceback (most recent call last):
File "/home/x/anaconda3/envs/rag-env/lib/python3.9/runpy.py", line 197, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/home/x/anaconda3/envs/rag-env/lib/python3.9/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/home/x/Documents/RAG Pipeline/evalution/workflow_nodes.py", line 148, in
graph_builder = StateGraph(State)
graph_builder.add_node('get_random_node',get_random_node) graph_builder.add_node('keyword_extractor',keyword_extractor) graph_builder.add_node('context_scoring_node',context_scoring_node) graph_builder.add_node('seed_question_gen',seed_question_gen) graph_builder.add_node('reasoning_question_gen',reasoning_question_gen) graph_builder.add_node('conditional_question_gen',conditional_question_gen)
graph_builder.add_edge(START, "get_random_node") graph_builder.add_edge("get_random_node", "keyword_extractor") graph_builder.add_edge('keyword_extractor','context_scoring_node')
graph_builder.add_edge('seed_question_gen','seed_subgraph')
graph_builder.add_conditional_edges('context_scoring_node',context_filter) graph_builder.add_conditional_edges('seed_question_gen',seed_question_router) graph_builder.add_conditional_edges('reasoning_question_gen',reasoning_question_router) graph_builder.add_conditional_edges('conditional_question_gen',conditional_question_router)
What should I change?
This is because somewhere in the graph that you have created the Node property in the state of the graph, it is updated simultaneously by two or more nodes in the graph.