Issue with subagent checkpointer
I want to enable subagent checkpointer, but I found that the configurable parameters are not passed to the subagent when invoked. In this case, the checkpointer of subagent is not working.
I'm wondering is there any other method to enable subagent checkpointer?
Hi @Leemin0311, May I ask why do you need to add a checkpointer inside a SubAgent?
If the main agent already has a checkpointer configured, the SubAgent’s state is saved automatically as part of the main agent’s state. However, if you add a checkpointer to the SubAgent itself, then you also need to provide a separate config field. This effectively turns the SubAgent into an independent, top-level agent with its own isolated state management.
But this makes interrupt recovery significantly more complicated, because now you’re dealing with two parallel checkpoint hierarchies instead of one unified state tree.
Is there a specific reason or use case where putting a checkpointer directly inside a SubAgent is necessary or recommended? Or should SubAgents generally rely on the main agent’s checkpointer for proper state restoration?
Hi @Chenyl-Sai, Correct if I'm wrong. As far as I test and read the source code of deepagents, The main agent only stores the in/output of subagent. Thus the main agent doesn't know what's going on inside the subagent and cannot store the memory for subagents.
Therefore, if one subagent does similiar jobs, it will do each job from the very begaining even though there are some common steps among those jobs. That's why I want to add a separated checkpointer to a subagent. A separated checkpointer allows subagents having its own short memory, and also won't introduce context bloat problem (refering to the doc of deepagents).
The main agent only stores the in/output of subagent
The short memory of the subagent is stored in the messages field of subagent_state, when create a new subagent, it will create a new state dict to avoid mutating the original, this helps to reduce the context bloat problem, and the main agent will store the state of this subagent, so you don't need to process the short memory of the subagent, is that right?
Excuse me for interrupting the conversation. I agree that subagents should maintain checkpoints. When a subagent terminates due to an error during long-running tasks, all progress made up to that point is lost, requiring restarting from the most recent checkpoint (the point at which the task tool was last used). This is somewhat inefficient.
Excuse me for interrupting the conversation. I agree that subagents should maintain checkpoints. When a subagent terminates due to an error during long-running tasks, all progress made up to that point is lost, requiring restarting from the most recent checkpoint (the point at which the task tool was last used). This is somewhat inefficient.
Hi @mjrs33, welcome to join the discussion :). I’d like to understand how subagent recovery works when an exception occurs. When a subagent throws an error or hits an interrupt, how exactly is the subagent execution restored?
- Does the system resume execution through a HITL interrupt recovery mechanism?
- Or does it restart the subagent using the previously assigned
thread_id? - Is the recovery process triggered after the parent agent re-enters and resumes control?
I’m also unsure about the parent agent’s state during a subagent failure:
- What is the parent agent’s state when a subagent exception occurs?
- Is the parent agent paused at the point where it dispatched the subagent?
- Or is it also rolled back to a previous checkpoint?
Would you please tell us more detail about this?
@Chenyl-Sai You can check the code you quoted again.
In the _validate_and_prepare_state function, runtime.state.items() is the state of main agent, and only the fileds except messages and todos are added to subagent_state.
In the _return_command_with_state_update function, the update command of main agent only takes the last message of subagent.
Therefore, in the main agent state, there are only the input and output of a subagent. The main agent doesn't maintain the detail message history of subagent.
In the
_validate_and_prepare_statefunction,runtime.state.items()is the state of main agent, and only the fileds except messages and todos are added tosubagent_state.
Yes, the subagent does not save or inherit the short-term memory from the parent agent. This is intentional to reduce token usage.
In the
_return_command_with_state_updatefunction, the update command of main agent only takes the last message of subagent.
The _return_command_with_state_update method does more than just return the subagent's final output as the tool result. It also returns a state_update, which represents the subagent's internal state changes except for the _EXCLUDED_STATE_KEYS.
Because of this, you can access any additional state produced by the subagent directly from the parent graph. This allows the main agent to read whatever extra data the subagent updated during its execution.
@Chenyl-Sai Totally understand this. But in some cases, the main agent doesn't need be aware of the subagent state while the subagent need its own short memory.
For example. There is a customer service main agent and a product q&a agent. For the q&a agent, it should query product detail before answer the question. If the user ask 2 questions about one same product, the q&a agent will query product detail 2 times. But if the q&a agent have checkpointer(short memory), it will only query product detail 1 time.
That's why I want to enable separeted checkpointer for a subaget. What do you think? Or do you have another way to resolve the above problem?
Let me check if I’m understanding your scenario correctly:
- You have a customer-service main agent, which contains several subagent specialized for different types of user inquiries.
- One of these subagent handles product-information queries, responsible for answering questions about specific products.
- What you want is: when the user is asking about the same product, the subagent should directly return the previously retrieved result, instead of performing the same retrieval or reasoning again.
Is my understanding correct?
I’m also curious how you define the subagent’s thread_id in this setup. Natural language variations can be huge—so how do you ensure that different user questions about the same product map back to the same thread_id?
Another idea: Inside the subagent, you could add a cache node. If the user asks about a product that was already processed before, the subagent can first check the cache and return the stored result immediately. Would this approach work for your use case?
Natural language variations can be huge—so how do you ensure that different user questions about the same product map back to the same thread_id?
only for the same user with same products
one user one session one thread_id
@Chenyl-Sai This is a simple script for verifying error handling behavior.
import random
from typing import Annotated, NotRequired
import operator
from deepagents import create_deep_agent
from dotenv import load_dotenv
from langchain.agents.middleware import AgentMiddleware
from langchain.agents.middleware.types import AgentState
from langchain.tools import tool
from langchain_core.messages import ToolMessage
from langgraph.types import Command
from langgraph.checkpoint.memory import InMemorySaver
load_dotenv()
class MyAgentState(AgentState):
count: Annotated[int, operator.add]
in_subagent: NotRequired[bool]
@tool
def random_number() -> int:
"""Return a random number between 1 and 100."""
# raise RuntimeError("Simulated error.")
return random.randint(1, 100)
@tool
def add_one(x: int) -> int:
"""Add one to the input number."""
return x + 1
@tool
def blackbox(x: int) -> int:
"""Black box tool"""
raise RuntimeError("Unresolved error.")
class MyMiddleware(AgentMiddleware):
state_schema = MyAgentState
def __init__(self, is_subagent: bool = False):
self.is_subagent = is_subagent
def before_agent(self, state, runtime):
n_msgs = len(state.get("messages", []))
count = state.get("count", 0)
name = "supervisor" if not self.is_subagent else "subagent"
print(f"Entering {name}. n_msgs: {n_msgs}, count: {count}")
if self.is_subagent:
return {"in_subagent": True}
return None
def after_agent(self, state, runtime):
n_msgs = len(state.get("messages", []))
count = state.get("count", 0)
name = "supervisor" if not self.is_subagent else "subagent"
print(f"Exiting {name}. n_msgs: {n_msgs}, count: {count}")
if self.is_subagent:
return {"in_subagent": False}
def wrap_tool_call(self, request, handler):
state = request.state
count = state.get("count", 0)
print(f"tool: {request.tool.name}, n_msgs: {len(state.get('messages', []))}, count: {count}")
tool_result = handler(request)
if isinstance(tool_result, ToolMessage):
return Command(update={"count": 1, "messages": [tool_result]})
tool_result.update["count"] = 1
return tool_result
model = ...
subagent = {
"name": "math-agent",
"description": "An agent that performs simple math operations.",
"tools": [random_number, add_one, blackbox],
"moodel": model,
"system_prompt": "You are a helpful math assistant.",
"middleware": [MyMiddleware(is_subagent=True)],
}
agent = create_deep_agent(
model=model,
subagents=[subagent],
middleware=[MyMiddleware()] ,
checkpointer=InMemorySaver(),
)
result = agent.invoke(
{"messages": [
"Ask the math-agent to perform the following operations in sequence: "
"1. Execute random_number "
"2. Pass the result of step 1 to the add_one tool "
"3. Pass the result of step 2 to the blackbox tool."
]},
config={"configurable": {"thread_id": "test"}},
)
Output:
Entering supervisor. n_msgs: 1, count: 0
tool: task, n_msgs: 2, count: 0
Entering subagent. n_msgs: 1, count: 0
tool: random_number, n_msgs: 2, count: 0
tool: add_one, n_msgs: 4, count: 1
tool: blackbox, n_msgs: 6, count: 2
<error message>
After running this, the state is;
last_state = agent.get_state({"configurable": {"thread_id": "test"}})
print("[tool_call]", last_state.values["messages"][-1].tool_calls[0]["args"])
print("[count]", last_state.values["count"])
print("[next]", last_state.next)
[tool_call] {'description': 'Perform the following sequence of math operations: 1. Execute random_number 2. Pass the result of random_number to add_one 3. Pass the result of add_one to blackbox tool. Return the outputs from each step, showing the flow of data from one step to the next.', 'subagent_type': 'math-agent'}
[count] 0
[next] ('tools',)
This means that when a sub-agent terminates with an error, the updates made within the sub-agent will not be reflected in the parent agent. The parent agent and its checkpoints remain stopped at the time of task assignment and are only updated when the sub-agent tool returns values.
For resuming, whether using ToolRetryMiddleware or performing manual resuming, both methods use the same checkpoints described above.
@mjrs33
Yes, you are right. Using get_state on the main agent cannot retrieve the state of a subagent that has been wrapped as a tool, even when using the subgraphs parameter.
If I only want to inspect the subagent's internal state, the only reliable method I’ve found is:
- When the subagent is running, interrupted, or hits an exception,
- The
tasksfield returned byget_state()indicate which subagent was active at the moment of failure, - And then I can call:
try:
result = agent.invoke(
{"messages": [
"Ask the math-agent to perform the following operations in sequence: "
"1. Execute random_number "
"2. Pass the result of step 1 to the add_one tool "
"3. Pass the result of step 2 to the blackbox tool."
"just call the math-agent, it has anything you need for the question."
]},
config=config,
)
finally:
snapshot = agent.get_state(config)
if snapshot.tasks:
for task in snapshot.tasks:
new_config = {"configurable": {"thread_id": "test", "checkpoint_ns": f"{task.name}:{task.id}"}}
checkpoint: Checkpoint = checkpointer.get(new_config)
print(f"state of the subagent: {checkpoint}")
This does successfully return the subagent's checkpoint, and inside it, I can access checkpoint["channel_values"], which contains the subagent's state at the moment of the exception.
However, none of this seems useful for resuming execution of the graph.
Whenever the graph throws an exception and run it again, it always starts from the beginning. So I’m not sure how you are resuming execution. Are you trying to use the checkpoint to do Time Travel?
I tried manually using checkpoint_ns from the main agent to retrieve or update subagent state, but it always fails with Subgraph ** not found
My assumption is that once a subagent is wrapped as a tool, its namespace is no longer accessible in a way that allows direct restoration from the parent agent.
This makes me think that if you want to time-travel or resume from a subagent checkpoint from the parent graph, it’s actually impossible. It seems we can only use the checkpointer of subagent to do this
Have you personally implemented exception recovery inside a subagent using a checkpointer?
If so, could you explain how you managed to do it?
@Chenyl-Sai
I wasn't aware that "checkpoint_ns" and "channel_values" could be used to restore a subagent's state. Thank you for explaining this. The "resume" I was implementing was actually a "retry," which is handled by the langchain.agents.middleware.ToolRetryMiddleware. In other words, when the main agent fails a task tool, ToolRetryMiddleware will execute the task tool again with the same arguments within wrap_tool_call, causing the subagent to rerun from the beginning.
If restoring the state of a sub-agent is possible, couldn't we solve this using a Middleware that jumps to the tool node usingbefore_agent? When messages end with a tool call, it would jump to the tool node and resume execution. Give the sub-agent another middleware that restores state in before_agent and then jumps to the appropriate node.
Based on the current implementation, even if we successfully retrieve the state history, there is no practical way to use it. The task method creates a brand-new state every time, so there’s no natural way to reuse the historical state.
Trying to manually pass the historical state back into the subagent through other mechanisms becomes extremely complicated. Given this, adding a checkpointer actually seems to be the cleanest and most reliable solution.
Once a checkpointer is enabled for the subagent, we can inspect the stored state during before_mode (not before_agent, to dynamically adjust the graph requires deep internal control within Pregel, which is far too complex) and use jump_to to directly skip to ToolNode.
This makes controlled resumption possible and avoids re-executing the entire Task/subagent from scratch.
Understood. Enabling the checkpointer for subagents seems like the best approach. I agree with that opinion.