langgraph
langgraph copied to clipboard
subflow can't be resume by main flow
Checked other resources
- [X] I added a very descriptive title to this issue.
- [X] I searched the LangGraph/LangChain documentation with the integrated search.
- [X] I used the GitHub search to find a similar question and didn't find it.
- [X] I am sure that this is a bug in LangGraph/LangChain rather than my code.
- [X] I am sure this is better as an issue rather than a GitHub discussion, since this is a LangGraph bug and not a design question.
Example Code
import random
from typing import TypedDict, Sequence, Annotated, Dict, Callable, Any
from langchain_core.messages import BaseMessage, AIMessage, HumanMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
class StateA(TypedDict):
name: str
messages: Annotated[Sequence[BaseMessage], add_messages]
class StateB(TypedDict):
name: str
age: int
messages: Annotated[Sequence[BaseMessage], add_messages]
def a_llm(state: StateA) -> Dict[str, Any]:
print('a_llm')
print(state)
return {'messages': [AIMessage(content='a_llm')]}
def b_llm(state: StateB) -> Dict[str, Any]:
print('b_llm')
print(state)
return {'messages': [AIMessage(content='b_llm')]}
def a_a(state: StateA) -> Dict[str, Any]:
print('a_a')
print(state)
return {'messages': [AIMessage(content='a_a')]}
def b_a(state: StateB) -> Dict[str, Any]:
print('b_a')
print(state)
return {'messages': [AIMessage(content='b_a')]}
def a_b(state: StateB) -> Dict[str, Any]:
print('a_b')
print(state)
return {'messages': [AIMessage(content='a_b')]}
def human_b(state: StateB) -> Dict[str, Any]:
print('human_b, continue ? A: continue B stop')
print(state)
return {'messages': [AIMessage(content='continue ? A: continue B stop')]}
def judge(state: StateB) -> str:
i = random.randint(1, 10)
print(f'judge {i}')
print(state)
if i > 6:
return 'c_b'
else:
return END
def h_j(state: StateB) -> str:
msg = state.get('messages')[-1].content
if msg == 'A':
return 'b_b'
else:
return END
def b_b(state: StateB) -> Dict[str, Any]:
print('b_b')
print(state)
return {'messages': [AIMessage(content='b_b')]}
def c_b(state: StateB) -> Dict[str, Any]:
print('c_b')
print(state)
return {'messages': [AIMessage(content='c_b')]}
sf = StateGraph(StateB)
sf.add_node('b_llm', b_llm)
sf.add_node('a_b', a_b)
sf.add_node('human_b', human_b)
sf.add_node('b_b', b_b)
sf.add_node('c_b', c_b)
sf.add_edge('b_llm', 'a_b')
sf.add_edge('a_b', 'human_b')
sf.add_conditional_edges('human_b', h_j)
sf.add_conditional_edges('b_b', judge)
sf.add_edge('c_b', END)
sf.set_entry_point('b_llm')
memory = MemorySaver()
graph = StateGraph(StateA)
graph.add_node('a_llm', a_llm)
graph.add_node('a_a', a_a)
graph.add_node('b_s', sf.compile(checkpointer=memory, interrupt_before=['human_b']))
graph.add_node('b_a', b_a)
graph.add_edge('a_llm', 'a_a')
graph.add_edge('a_a', 'b_s')
graph.add_edge('b_s', 'b_a')
graph.set_entry_point('a_llm')
app = graph.compile(checkpointer=memory)
cfg = {"configurable": {"thread_id": "thread-1"}}
res = app.invoke(StateA(name='xxx', messages=[]), cfg)
print(res)
snapshot = app.get_state(cfg)
app.update_state(cfg, {'messages': [HumanMessage(content='A')]})
r = app.invoke(None, config=cfg, stream_mode='values')
print(r)
Error Message and Stack Trace (if applicable)
a_llm
{'name': 'xxx', 'messages': []}
a_a
{'name': 'xxx', 'messages': [AIMessage(content='a_llm', id='914842fa-df73-4495-b2b0-1ebbceb67e8e')]}
b_llm
{'name': 'xxx', 'age': None, 'messages': [AIMessage(content='a_llm', id='914842fa-df73-4495-b2b0-1ebbceb67e8e'), AIMessage(content='a_a', id='59c026c4-dd08-4ca4-8187-038f31e89854')]}
a_b
{'name': 'xxx', 'age': None, 'messages': [AIMessage(content='a_llm', id='914842fa-df73-4495-b2b0-1ebbceb67e8e'), AIMessage(content='a_a', id='59c026c4-dd08-4ca4-8187-038f31e89854'), AIMessage(content='b_llm', id='47006267-1337-4b21-b4ed-c04ec28bcaa0')]}
{'name': 'xxx', 'messages': [AIMessage(content='a_llm', id='914842fa-df73-4495-b2b0-1ebbceb67e8e'), AIMessage(content='a_a', id='59c026c4-dd08-4ca4-8187-038f31e89854')]}
b_llm
{'name': 'xxx', 'age': None, 'messages': [AIMessage(content='a_llm', id='914842fa-df73-4495-b2b0-1ebbceb67e8e'), AIMessage(content='a_a', id='59c026c4-dd08-4ca4-8187-038f31e89854'), HumanMessage(content='A', id='fb15b763-a436-4b7a-afda-b503e7127d26')]}
a_b
{'name': 'xxx', 'age': None, 'messages': [AIMessage(content='a_llm', id='914842fa-df73-4495-b2b0-1ebbceb67e8e'), AIMessage(content='a_a', id='59c026c4-dd08-4ca4-8187-038f31e89854'), HumanMessage(content='A', id='fb15b763-a436-4b7a-afda-b503e7127d26'), AIMessage(content='b_llm', id='6d5919fe-9730-4442-b5a9-7cc6e0752ae8')]}
None
Description
my sub flow has some human node to collect manual feedback, when i add the message to main flow, the subflow can't resume. i can't invoke the subflow invoke to resume it because i dosn't know the graph step to which node, if i hava many subflow, i have to record it interrupt at which node and put every subflow at global variable
System Info
python 3.12 langraph 0.1.14