two node flow to one node
Checked other resources
- [X] I added a very descriptive title to this issue.
- [X] I searched the LangChain documentation with the integrated search.
- [X] I used the GitHub search to find a similar question and didn't find it.
- [X] I am sure that this is a bug in LangChain rather than my code.
Example Code
up and side node. output param as down node params.
Error Message and Stack Trace (if applicable)
No response
Description
up and side node. output param as down node params.
System Info
latest.
how impl: up and side node. output param as down node params.
can you share the code to your graph? what you illustrated should work fine
Hello,I can give you a example:
##graph.py
class AgentState(TypedDict):
messages: Annotated[Sequence[dict], operator.add]
workflow = StateGraph(AgentState)
workflow.add_node("Router", func_router)
workflow.add_node("PreConditionRunner", func_pre_condition_runner)
workflow.add_node("LayoutRunner", func_layout_runner)
workflow.add_node("Analyzer", func_analyzer)
workflow.add_node("CaseAssembler", func_case_assembler)
workflow.set_entry_point("Router")
workflow.add_conditional_edges(
"Router",
should_continue,
{
"required": "PreConditionRunner",
"common": "LayoutRunner",
"common2":"Analyzer"
}
)
workflow.add_edge('Router', 'Analyzer')
workflow.add_edge('Router', 'LayoutRunner')
workflow.add_edge('PreConditionRunner', 'CaseAssembler')
workflow.add_edge('Analyzer', 'CaseAssembler')
workflow.add_edge('LayoutRunner', 'CaseAssembler')
workflow.add_edge('CaseAssembler', END)
app = workflow.compile()
print("Final\n ",app.invoke({"messages": ["hello"]}))
and this:
##nodes.py
def func_router(state):
msg = "this is Router",time.time()
print(msg)
return {"messages": [msg]}
def func_pre_condition_runner(state):
time.sleep(1)
msg = "this is pre_condition_runner",time.time()
print(msg)
return {"messages": [msg]}
def func_layout_runner(state):
time.sleep(2)
msg = "this is layout_runner",time.time()
print(msg)
return {"messages": [msg]}
def func_analyzer(state):
msg = "this is analyzer",time.time()
print(msg)
return {"messages": [msg]}
def func_case_assembler(state):
msg = "this is func_case_assembler",time.time()
print(msg)
print(state['messages'])
return {"messages": [msg]}
def should_continue(state):
return "required"
IN this case,your will see three data flow into "caseAssembler" without any other .😁
I'd like to add some examples of branching and summarizing multiple data streams to the examples.@hwchase17
when I writing the graph structure like this, I got this exception before entering 'caseAssembler'. InvalidUpdateError: Invalid update for channel Final:inbox: LastValue can only receive one value per step.
Hi, I had the same problem and it seems indeed that the implemented Pregel algorithm does not support multiple input edges to a node.
The error is raised in the method update of the class LastValue.
A quick and dirty fix could be to comment the exception like this:
def update(self, values: Sequence[Value]) -> None:
if len(values) == 0:
return
# if len(values) != 1:
# raise InvalidUpdateError("LastValue can only receive one value per step.")
self.value = values[-1]
Then you can handle the multiple input edges by getting all input messages through the agent state.
I am sure this is not the way it is supposed to be done but it did the job to my very own use case.
Hope this helps.
There are some suggestions.
- downgrade version to 0.0.26(my version)
- add a add_conditional_edges between CaseAssembler to END.If it is not all over, it does not push the entire chart to end.
We do support multiple input edges in more recent versions of langgraph, here's an example for the original issue above
class State(TypedDict):
my_key: Annotated[str, operator.add]
def up(state: State):
pass
def side(state: State):
pass
def down(state: State):
pass
graph = StateGraph(State)
graph.add_node("up", up)
graph.add_node("side", side)
graph.add_node("down", down)
graph.set_entry_point("up")
graph.add_edge("up", "side")
graph.add_edge(["up", "side"], "down")
graph.set_finish_point("down")
app = graph.compile()
assert app.get_graph().draw_ascii() == (
""" +-----------+
| __start__ |
+-----------+
*
*
*
+----+
| up |
+----+
** **
* *
* *
+------+ *
| side | *
+------+ *
** **
* *
* *
+------+
| down |
+------+
*
*
*
+---------+
| __end__ |
+---------+ """
)
@quick-sort , I'm receiving the same issue, and the solution of @nfcampos doesn't help How did you solve it?
@nfcampos , thank you for provided code, I tried your solution My code (here is not branching, but sub_graphs)
from functools import partial
import operator
from langgraph.graph import StateGraph, END
from typing import Annotated, Any, Dict, List, TypedDict
from PIL import Image as PIL_Image
import io
from typing import TypedDict, Any
import copy
short_context = {
'is_start': 'start',
"counter": 0
}
class StepRunState(TypedDict):
short_context: Any
# Define a new graph
sub_workflow = StateGraph(StepRunState)
sub_workflow2 = StateGraph(StepRunState)
workflow_main = StateGraph(StepRunState)
def should_continue(state: StepRunState):
print('should_continue')
return 'continue'
def should_continue2(state: StepRunState):
print('should_continue')
return 'continue'
def call_step(step_id: str, state: StepRunState):
new_state_copy = copy.deepcopy(state)
if "workflow" in step_id:
new_state_copy['short_context']['counter'] += 5
else:
new_state_copy['short_context']['counter'] += 1
new_state = {"short_context": {**new_state_copy['short_context'], 'step': step_id}}
print(f'returning from {step_id} state: {new_state}')
return new_state
def create_partial(a, b):
return partial(a,b)
sub_workflow.add_node("step_1", create_partial(call_step, "step_1"))
sub_workflow.add_node("step_2", create_partial(call_step, "step_2"))
sub_workflow.add_node("step_3", create_partial(call_step, "step_3"))
sub_workflow2.add_node("step_1-workflow-2", create_partial(call_step, "step_1-workflow-2"))
sub_workflow2.add_node("step_2-workflow-2", create_partial(call_step, "step_2-workflow-2"))
sub_workflow2.add_node("step_3-workflow-2", create_partial(call_step, "step_3-workflow-2"))
sub_workflow.set_entry_point("step_1")
sub_workflow2.set_entry_point("step_1-workflow-2")
sub_workflow.add_conditional_edges(
'step_2',
should_continue,
{
"continue": "step_3",
}
)
sub_workflow2.add_conditional_edges(
'step_2-workflow-2',
should_continue2,
{
"continue": "step_3-workflow-2",
}
)
sub_workflow.add_edge('step_1', 'step_2')
# sub_workflow.add_edge('step_3', END)
sub_workflow.set_finish_point('step_3')
sub_workflow2.add_edge('step_1-workflow-2', 'step_2-workflow-2')
# sub_workflow2.add_edge('step_3-workflow-2', END)
sub_workflow2.set_finish_point('step_3-workflow-2')
def call_final_node(state):
print('call_final_node', state)
return state
workflow_main.support_multiple_edges = True
workflow_main.add_node('subnode', sub_workflow.compile())
workflow_main.add_node('subnode2', sub_workflow2.compile())
# workflow_main.set_finish_point('subnode')
# workflow_main.set_finish_point('subnode2')
workflow_main.add_node('start_node_main', lambda x: x)
workflow_main.add_node('finish_node_main', call_final_node)
workflow_main.add_edge('start_node_main', 'subnode')
workflow_main.add_edge('start_node_main', 'subnode2')
workflow_main.add_edge(['subnode', 'subnode2'], 'finish_node_main')
workflow_main.set_entry_point("start_node_main")
workflow_main.set_finish_point("finish_node_main")
# print(workflow_main.compile().get_graph(xray=True).draw_ascii())
workflow_main.compile().invoke({'short_context': short_context})
Output
values in LastValue []
values in LastValue [{'is_start': 'start', 'counter': 0}]
values in LastValue [{'is_start': 'start', 'counter': 0}]
values in LastValue []
values in LastValue []
values in LastValue [{'is_start': 'start', 'counter': 0}]
values in LastValue [{'is_start': 'start', 'counter': 0}]
returning from step_1 state: {'short_context': {'is_start': 'start', 'counter': 1, 'step': 'step_1'}}
returning from step_1-workflow-2 state: {'short_context': {'is_start': 'start', 'counter': 5, 'step': 'step_1-workflow-2'}}
values in LastValue [{'is_start': 'start', 'counter': 1, 'step': 'step_1'}]
values in LastValue [{'is_start': 'start', 'counter': 5, 'step': 'step_1-workflow-2'}]
returning from step_2 state: {'short_context': {'is_start': 'start', 'counter': 2, 'step': 'step_2'}}
returning from step_2-workflow-2 state: {'short_context': {'is_start': 'start', 'counter': 10, 'step': 'step_2-workflow-2'}}
values in LastValue [{'is_start': 'start', 'counter': 2, 'step': 'step_2'}]
values in LastValue [{'is_start': 'start', 'counter': 10, 'step': 'step_2-workflow-2'}]
should_continue
values in LastValue [{'is_start': 'start', 'counter': 2, 'step': 'step_2'}]
should_continue
returning from step_3 state: {'short_context': {'is_start': 'start', 'counter': 3, 'step': 'step_3'}}
values in LastValue [{'is_start': 'start', 'counter': 10, 'step': 'step_2-workflow-2'}]
values in LastValue [{'is_start': 'start', 'counter': 3, 'step': 'step_3'}]
returning from step_3-workflow-2 state: {'short_context': {'is_start': 'start', 'counter': 15, 'step': 'step_3-workflow-2'}}
values in LastValue [{'is_start': 'start', 'counter': 15, 'step': 'step_3-workflow-2'}]
values in LastValue [{'is_start': 'start', 'counter': 3, 'step': 'step_3'}, {'is_start': 'start', 'counter': 15, 'step': 'step_3-workflow-2'}]
error channels {'short_context': <langgraph.channels.last_value.LastValue object at 0x7ff9c36e2490>, '__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7ff9c36e17d0>, 'subnode': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7ff9c36e1750>, 'subnode2': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7ff9c36e2250>, 'start_node_main': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7ff9c36e2450>, 'finish_node_main': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7ff9c36e1d10>, 'start:start_node_main': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7ff9c36e1790>}
Traceback (most recent call last):
File "/home/dmitry/.pyenv/versions/3.11.6/lib/python3.11/site-packages/langgraph/pregel/__init__.py", line 1117, in _apply_writes
channels[chan].update(vals)
File "/home/dmitry/.pyenv/versions/3.11.6/lib/python3.11/site-packages/langgraph/channels/last_value.py", line 56, in update
raise InvalidUpdateError("LastValue can only receive one value per step.")
langgraph.channels.base.InvalidUpdateError: LastValue can only receive one value per step.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/dmitry/Desktop/langgraph_snippet.py", line 146, in <module>
workflow_main.compile().invoke({'short_context': short_context})
File "/home/dmitry/.pyenv/versions/3.11.6/lib/python3.11/site-packages/langgraph/pregel/__init__.py", line 978, in invoke
for chunk in self.stream(
File "/home/dmitry/.pyenv/versions/3.11.6/lib/python3.11/site-packages/langgraph/pregel/__init__.py", line 694, in stream
_apply_writes(checkpoint, channels, pending_writes)
File "/home/dmitry/.pyenv/versions/3.11.6/lib/python3.11/site-packages/langgraph/pregel/__init__.py", line 1120, in _apply_writes
raise InvalidUpdateError(
langgraph.channels.base.InvalidUpdateError: Invalid update for channel short_context: LastValue can only receive one value per step.
Tell me, please, what can be the problem here - I do exactly the same as you P.S. I also did a print in last_values.py on 51 line of code
TLDR: set your short_context using Annotated[Any, lambda x, y: x], see below for explanation
I had a bunch of problems using this, and it is very important to provide the Annotated type so that the "metadata" of Annotated can be used for merging state like an operator.
Example
import operator
from typing import Optional, Dict, TypedDict, List
from langgraph.graph import StateGraph
def keep_first(a: Any, b: Any) -> Any:
return a
# Custom merging "operator" for dictionaries
def dict_reducer(a: Dict, b: Dict) -> Dict:
result = a.copy()
result.update(b)
return result
class MyNestedObject(TypedDict):
some_str: Optional[str]
iterations: int
class MyGraph(TypedDict):
# Simple list
messages: Annotated[List[str], operator.add]
dict_to_object: Annotated[Dict[str, MyNestedObject], dict_reducer]
my_dict_that_doesnt_change: Annotated[Dict[str, Any], keep_first]
# Your multi-input and parallel superstep graph
...
graph.invoke({"my_dict_that_doesnt_change": {}, "dict_to_object": {}, "messages": []})
ANY object in the state that does not follow this practice will be rejected in the multi-input (possibly parallel) node case due to LastValue exception. This is especially important when having a graph that has parallel steps that are supposed to return to a sink node.
Here's an example graph I've been working with (using conditionals and what not, and parallel executions). My execution would always fail in the sink node when the parallel executions would gather due to all the different edges coming in with 3 different versions of the state, without a way to merge, so it would fail on trying to update 3 times whereas it only supports 1.
E.g. a flow here would be that the common node would select the next nodes to be for instance ["p2", "p3"], triggering multiple steps in parallel, causing the LastValue exception when the 2 flows that come over to the sink node have 2 values rather than 1.
Useful docs: https://langchain-ai.github.io/langgraph/how-tos/branching/#conditional-branching
TLDR: set your short_context using
Annotated[Any, lambda x, y: x], see below for explanation
For some reason, sometimes the start node will send the state variable to y instead of x, so I've set it to Annotated[Any, lambda, y: x or y].