langgraph icon indicating copy to clipboard operation
langgraph copied to clipboard

two node flow to one node

Open me-v2 opened this issue 1 year ago • 3 comments

Checked other resources

  • [X] I added a very descriptive title to this issue.
  • [X] I searched the LangChain documentation with the integrated search.
  • [X] I used the GitHub search to find a similar question and didn't find it.
  • [X] I am sure that this is a bug in LangChain rather than my code.

Example Code

CleanShot 2024-02-23 at 16 48 41

up and side node. output param as down node params.

Error Message and Stack Trace (if applicable)

No response

Description

up and side node. output param as down node params.

System Info

latest.

me-v2 avatar Feb 23 '24 08:02 me-v2

how impl: up and side node. output param as down node params.

me-v2 avatar Feb 23 '24 08:02 me-v2

can you share the code to your graph? what you illustrated should work fine

russell-dot-js avatar Feb 24 '24 08:02 russell-dot-js

Hello,I can give you a example:

##graph.py
class AgentState(TypedDict):
    messages: Annotated[Sequence[dict], operator.add]


workflow = StateGraph(AgentState)

workflow.add_node("Router", func_router)
workflow.add_node("PreConditionRunner", func_pre_condition_runner)
workflow.add_node("LayoutRunner", func_layout_runner)
workflow.add_node("Analyzer", func_analyzer)
workflow.add_node("CaseAssembler", func_case_assembler)

workflow.set_entry_point("Router")
workflow.add_conditional_edges(
    "Router",
    should_continue,
    {
        "required": "PreConditionRunner",
        "common": "LayoutRunner",
        "common2":"Analyzer"
    }
)
workflow.add_edge('Router', 'Analyzer')
workflow.add_edge('Router', 'LayoutRunner')

workflow.add_edge('PreConditionRunner', 'CaseAssembler')
workflow.add_edge('Analyzer', 'CaseAssembler')
workflow.add_edge('LayoutRunner', 'CaseAssembler')
workflow.add_edge('CaseAssembler', END)
app = workflow.compile()
print("Final\n   ",app.invoke({"messages": ["hello"]}))

and this:

##nodes.py
def func_router(state):
    msg = "this is Router",time.time()

    print(msg)
    return {"messages": [msg]}


def func_pre_condition_runner(state):
    time.sleep(1)
    msg = "this is pre_condition_runner",time.time()
    print(msg)
    return {"messages": [msg]}


def func_layout_runner(state):
    time.sleep(2)
    msg = "this is layout_runner",time.time()
    print(msg)
    return {"messages": [msg]}


def func_analyzer(state):
    msg = "this is analyzer",time.time()
    print(msg)
    return {"messages": [msg]}


def func_case_assembler(state):
    msg = "this is func_case_assembler",time.time()
    print(msg)
    print(state['messages'])
    return {"messages": [msg]}


def should_continue(state):
    return "required"

IN this case,your will see three data flow into "caseAssembler" without any other .😁 I'd like to add some examples of branching and summarizing multiple data streams to the examples.@hwchase17 image

JemicyChu avatar Mar 04 '24 07:03 JemicyChu

when I writing the graph structure like this, I got this exception before entering 'caseAssembler'. InvalidUpdateError: Invalid update for channel Final:inbox: LastValue can only receive one value per step.

quick-sort avatar Mar 19 '24 07:03 quick-sort

Hi, I had the same problem and it seems indeed that the implemented Pregel algorithm does not support multiple input edges to a node.

The error is raised in the method update of the class LastValue.

A quick and dirty fix could be to comment the exception like this:

  def update(self, values: Sequence[Value]) -> None:
      if len(values) == 0:
          return
      # if len(values) != 1:
      #     raise InvalidUpdateError("LastValue can only receive one value per step.")

      self.value = values[-1]

Then you can handle the multiple input edges by getting all input messages through the agent state.

I am sure this is not the way it is supposed to be done but it did the job to my very own use case.

Hope this helps.

xabier64 avatar Mar 19 '24 08:03 xabier64

There are some suggestions.

  1. downgrade version to 0.0.26(my version)
  2. add a add_conditional_edges between CaseAssembler to END.If it is not all over, it does not push the entire chart to end.

JemicyChu avatar Mar 26 '24 07:03 JemicyChu

We do support multiple input edges in more recent versions of langgraph, here's an example for the original issue above

class State(TypedDict):
    my_key: Annotated[str, operator.add]

def up(state: State):
    pass

def side(state: State):
    pass

def down(state: State):
    pass

graph = StateGraph(State)

graph.add_node("up", up)
graph.add_node("side", side)
graph.add_node("down", down)

graph.set_entry_point("up")
graph.add_edge("up", "side")
graph.add_edge(["up", "side"], "down")
graph.set_finish_point("down")

app = graph.compile()

assert app.get_graph().draw_ascii() == (
    """    +-----------+  
| __start__ |  
+-----------+  
        *       
        *       
        *       
    +----+     
    | up |     
    +----+     
    **     **   
    *         *  
    *           * 
+------+          *
| side |         * 
+------+        *  
    **     **   
        *   *     
        * *      
    +------+    
    | down |    
    +------+    
        *       
        *       
        *       
    +---------+  
    | __end__ |  
    +---------+  """
)

nfcampos avatar Mar 30 '24 00:03 nfcampos

@quick-sort , I'm receiving the same issue, and the solution of @nfcampos doesn't help How did you solve it?

mrbuslov avatar Apr 17 '24 14:04 mrbuslov

@nfcampos , thank you for provided code, I tried your solution My code (here is not branching, but sub_graphs)

from functools import partial
import operator
from langgraph.graph import StateGraph, END
from typing import Annotated, Any, Dict, List, TypedDict
from PIL import Image as PIL_Image
import io
from typing import TypedDict, Any
import copy


short_context = {
    'is_start': 'start',
    "counter": 0
}

class StepRunState(TypedDict):
    short_context: Any


# Define a new graph
sub_workflow = StateGraph(StepRunState)
sub_workflow2 = StateGraph(StepRunState)
workflow_main = StateGraph(StepRunState)

def should_continue(state: StepRunState):
    print('should_continue')
    return 'continue'

def should_continue2(state: StepRunState):
    print('should_continue')
    return 'continue'

def call_step(step_id: str, state: StepRunState):
    new_state_copy = copy.deepcopy(state)
    if "workflow" in step_id:
        new_state_copy['short_context']['counter'] += 5
    else:
        new_state_copy['short_context']['counter'] += 1
    
    new_state = {"short_context": {**new_state_copy['short_context'], 'step': step_id}}
    print(f'returning from {step_id} state: {new_state}')
    
    
    return new_state

def create_partial(a, b):
    return partial(a,b)

sub_workflow.add_node("step_1", create_partial(call_step, "step_1"))
sub_workflow.add_node("step_2", create_partial(call_step, "step_2"))
sub_workflow.add_node("step_3", create_partial(call_step, "step_3"))

sub_workflow2.add_node("step_1-workflow-2", create_partial(call_step, "step_1-workflow-2"))
sub_workflow2.add_node("step_2-workflow-2", create_partial(call_step, "step_2-workflow-2"))
sub_workflow2.add_node("step_3-workflow-2", create_partial(call_step, "step_3-workflow-2"))


sub_workflow.set_entry_point("step_1")
sub_workflow2.set_entry_point("step_1-workflow-2")


sub_workflow.add_conditional_edges(
    'step_2',
    should_continue,
    {
        "continue": "step_3",
    }
)
sub_workflow2.add_conditional_edges(
    'step_2-workflow-2',
    should_continue2,
    {
        "continue": "step_3-workflow-2",
    }
)


sub_workflow.add_edge('step_1', 'step_2')
# sub_workflow.add_edge('step_3', END)
sub_workflow.set_finish_point('step_3')

sub_workflow2.add_edge('step_1-workflow-2', 'step_2-workflow-2')
# sub_workflow2.add_edge('step_3-workflow-2', END)
sub_workflow2.set_finish_point('step_3-workflow-2')


def call_final_node(state):
    print('call_final_node', state)
    return state


workflow_main.support_multiple_edges = True
workflow_main.add_node('subnode', sub_workflow.compile())
workflow_main.add_node('subnode2', sub_workflow2.compile())
# workflow_main.set_finish_point('subnode')
# workflow_main.set_finish_point('subnode2')

workflow_main.add_node('start_node_main', lambda x: x)
workflow_main.add_node('finish_node_main', call_final_node)
workflow_main.add_edge('start_node_main', 'subnode')
workflow_main.add_edge('start_node_main', 'subnode2')
workflow_main.add_edge(['subnode', 'subnode2'], 'finish_node_main')

workflow_main.set_entry_point("start_node_main")
workflow_main.set_finish_point("finish_node_main")

# print(workflow_main.compile().get_graph(xray=True).draw_ascii())

workflow_main.compile().invoke({'short_context': short_context})

Output

values in LastValue []
values in LastValue [{'is_start': 'start', 'counter': 0}]
values in LastValue [{'is_start': 'start', 'counter': 0}]
values in LastValue []
values in LastValue []
values in LastValue [{'is_start': 'start', 'counter': 0}]
values in LastValue [{'is_start': 'start', 'counter': 0}]
returning from step_1 state: {'short_context': {'is_start': 'start', 'counter': 1, 'step': 'step_1'}}
returning from step_1-workflow-2 state: {'short_context': {'is_start': 'start', 'counter': 5, 'step': 'step_1-workflow-2'}}
values in LastValue [{'is_start': 'start', 'counter': 1, 'step': 'step_1'}]
values in LastValue [{'is_start': 'start', 'counter': 5, 'step': 'step_1-workflow-2'}]
returning from step_2 state: {'short_context': {'is_start': 'start', 'counter': 2, 'step': 'step_2'}}
returning from step_2-workflow-2 state: {'short_context': {'is_start': 'start', 'counter': 10, 'step': 'step_2-workflow-2'}}
values in LastValue [{'is_start': 'start', 'counter': 2, 'step': 'step_2'}]
values in LastValue [{'is_start': 'start', 'counter': 10, 'step': 'step_2-workflow-2'}]
should_continue
values in LastValue [{'is_start': 'start', 'counter': 2, 'step': 'step_2'}]
should_continue
returning from step_3 state: {'short_context': {'is_start': 'start', 'counter': 3, 'step': 'step_3'}}
values in LastValue [{'is_start': 'start', 'counter': 10, 'step': 'step_2-workflow-2'}]
values in LastValue [{'is_start': 'start', 'counter': 3, 'step': 'step_3'}]
returning from step_3-workflow-2 state: {'short_context': {'is_start': 'start', 'counter': 15, 'step': 'step_3-workflow-2'}}
values in LastValue [{'is_start': 'start', 'counter': 15, 'step': 'step_3-workflow-2'}]
values in LastValue [{'is_start': 'start', 'counter': 3, 'step': 'step_3'}, {'is_start': 'start', 'counter': 15, 'step': 'step_3-workflow-2'}]
error channels {'short_context': <langgraph.channels.last_value.LastValue object at 0x7ff9c36e2490>, '__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7ff9c36e17d0>, 'subnode': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7ff9c36e1750>, 'subnode2': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7ff9c36e2250>, 'start_node_main': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7ff9c36e2450>, 'finish_node_main': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7ff9c36e1d10>, 'start:start_node_main': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7ff9c36e1790>}
Traceback (most recent call last):
  File "/home/dmitry/.pyenv/versions/3.11.6/lib/python3.11/site-packages/langgraph/pregel/__init__.py", line 1117, in _apply_writes
    channels[chan].update(vals)
  File "/home/dmitry/.pyenv/versions/3.11.6/lib/python3.11/site-packages/langgraph/channels/last_value.py", line 56, in update
    raise InvalidUpdateError("LastValue can only receive one value per step.")
langgraph.channels.base.InvalidUpdateError: LastValue can only receive one value per step.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/dmitry/Desktop/langgraph_snippet.py", line 146, in <module>
    workflow_main.compile().invoke({'short_context': short_context})
  File "/home/dmitry/.pyenv/versions/3.11.6/lib/python3.11/site-packages/langgraph/pregel/__init__.py", line 978, in invoke
    for chunk in self.stream(
  File "/home/dmitry/.pyenv/versions/3.11.6/lib/python3.11/site-packages/langgraph/pregel/__init__.py", line 694, in stream
    _apply_writes(checkpoint, channels, pending_writes)
  File "/home/dmitry/.pyenv/versions/3.11.6/lib/python3.11/site-packages/langgraph/pregel/__init__.py", line 1120, in _apply_writes
    raise InvalidUpdateError(
langgraph.channels.base.InvalidUpdateError: Invalid update for channel short_context: LastValue can only receive one value per step.

Tell me, please, what can be the problem here - I do exactly the same as you P.S. I also did a print in last_values.py on 51 line of code

mrbuslov avatar Apr 17 '24 14:04 mrbuslov

TLDR: set your short_context using Annotated[Any, lambda x, y: x], see below for explanation

I had a bunch of problems using this, and it is very important to provide the Annotated type so that the "metadata" of Annotated can be used for merging state like an operator.

Example

import operator
from typing import Optional, Dict, TypedDict, List
from langgraph.graph import StateGraph


def keep_first(a: Any, b: Any) -> Any:
    return a


# Custom merging "operator" for dictionaries
def dict_reducer(a: Dict, b: Dict) -> Dict:
    result = a.copy()
    result.update(b)
    return result
  
    
class MyNestedObject(TypedDict):
    some_str: Optional[str]
    iterations: int

class MyGraph(TypedDict):
   # Simple list
   messages: Annotated[List[str], operator.add]
   dict_to_object: Annotated[Dict[str, MyNestedObject], dict_reducer]
   my_dict_that_doesnt_change: Annotated[Dict[str, Any], keep_first]
   
# Your multi-input and parallel superstep graph
...
graph.invoke({"my_dict_that_doesnt_change": {}, "dict_to_object": {}, "messages": []})

ANY object in the state that does not follow this practice will be rejected in the multi-input (possibly parallel) node case due to LastValue exception. This is especially important when having a graph that has parallel steps that are supposed to return to a sink node.

Here's an example graph I've been working with (using conditionals and what not, and parallel executions). My execution would always fail in the sink node when the parallel executions would gather due to all the different edges coming in with 3 different versions of the state, without a way to merge, so it would fail on trying to update 3 times whereas it only supports 1.

E.g. a flow here would be that the common node would select the next nodes to be for instance ["p2", "p3"], triggering multiple steps in parallel, causing the LastValue exception when the 2 flows that come over to the sink node have 2 values rather than 1. image

Useful docs: https://langchain-ai.github.io/langgraph/how-tos/branching/#conditional-branching

simwijs-fmf avatar May 29 '24 05:05 simwijs-fmf

TLDR: set your short_context using Annotated[Any, lambda x, y: x], see below for explanation

For some reason, sometimes the start node will send the state variable to y instead of x, so I've set it to Annotated[Any, lambda, y: x or y].

youngchingjui avatar Jul 23 '24 02:07 youngchingjui