Thread in pending state while storing memory in the background
Checked other resources
- [x] This is a bug, not a usage question. For questions, please use GitHub Discussions.
- [x] I added a clear and detailed title that summarizes the issue.
- [x] I read what a minimal reproducible example is (https://stackoverflow.com/help/minimal-reproducible-example).
- [x] I included a self-contained, minimal example that demonstrates the issue INCLUDING all the relevant imports. The code run AS IS to reproduce the issue.
Example Code
I'm referring to this template (https://github.com/langchain-ai/memory-template) to store the memories in the background while connecting to a remote graph.
The application logic graph is running fine, but when its calling this schedule memories in the background, I'm seeing this message in the studio "Thread is in a pending state" and the memories are not getting stored.
async def schedule_memories(state: MessagesState, config: RunnableConfig) -> None:
memory_client = get_client()
await memory_client.runs.create(
thread_id=config["configurable"]["thread_id"],
multitask_strategy="enqueue",
after_seconds=3,
assistant_id='appointment_memory_bot',
input={"messages": []},
config={
"configurable": {
"user_id": config['configurable']['user_id'],
"memory_types": config['configurable']['memory_types'],
},
},
)
Error Message and Stack Trace (if applicable)
Description
-------------------------Application logic Agent/Graph------------------------------ def call_model(state: BookingAppointmentState, store: BaseStore, config: RunnableConfig) -> Command[Literal['tool_node', 'schedule_memories']]: model = ChatOpenAI(model="gpt-4o", openai_api_key=os.getenv("OPEN_AI_API_KEY")).bind_tools(tools)
appointment_namespace = ('appointments', )
apntment_details = store.get(appointment_namespace, config['configurable']['user_id'])
apntment_details = apntment_details.dict()['value'] if apntment_details else None
conversations_namespace = ('conversations', config['configurable'] ['user_id'])
conversations = store.search(conversations_namespace)
conversations = [(conversation.key, conversation.value) for conversation in conversations] if conversations else []
messages = [SystemMessage(content=system_prompt.format(apntment_details = apntment_details, conversations = conversations ))] + state['messages']
result = model.invoke(messages)
if len(result.tool_calls) > 0:
return Command(goto='tool_node', update={'messages': [result]})
return {'messages': [result]}
def tool_node(state: BookingAppointmentState, store: BaseStore, config: RunnableConfig) -> Command[Literal['ask_human', 'call_model']]: tool_names = {tool.name: tool for tool in tools} tool_calls = state['messages'][-1].tool_calls results = []
for tool_call in tool_calls:
tool_ = tool_names[tool_call["name"]]
# inject state
tool_input_fields = tool_.get_input_schema().model_json_schema()[
"properties"
]
if "state" in tool_input_fields:
tool_call = {**tool_call, "args": {**tool_call["args"], "state": state}}
print(tool_, tool_call)
tool_response = tool_.invoke(tool_call)
results.append(tool_response)
if len(results) > 0:
return results
else:
return Command(goto='call_model', update={'messages': [AIMessage(content=str(results))]})
def ask_human(state: BookingAppointmentState, store: BaseStore, config: RunnableConfig) -> Command[Literal['call_model']]: user_response = interrupt(state['question_to_patient'])
if user_response:
return Command(goto='call_model', update={
'messages': [HumanMessage(content=user_response)],
"question_to_patient": ''
})
def schedule_memories(state: MessagesState, config: RunnableConfig) -> None: memory_client = get_client()
memory_client.runs.create(
thread_id=config["configurable"]["thread_id"],
multitask_strategy="enqueue",
after_seconds=3,
assistant_id='appointment_memory_bot',
input={"messages": []},
config={
"configurable": {
"user_id": config['configurable']['user_id'],
"memory_types": config['configurable']['memory_types'],
},
},
)
-------------------------------------- Memory Agent / Graph --------------------------------------------- def scatter_schemas(state: MessagesState, config: RunnableConfig) -> list[Send]: memory_types = config['configurable']['memory_types'] sends = []
for type in memory_types:
match type:
case "appointments":
target = "update_appointments"
case "conversations":
target = "update_conversations"
case _:
raise ValueError(f"Unknown update mode: {type}")
sends.append(Send(target, state))
return sends
def update_appointments(state: MessagesState, store: BaseStore, config: RunnableConfig): user_id = config['configurable']['user_id']
namespace = ('appointments', )
key = user_id
existing_apntmnt_details = store.get(namespace, key)
existing_apntmnt_details = existing_apntmnt_details.dict()['value'] if existing_apntmnt_details else init_apntmnt_details
system_prompt = '''Observe the ongoing conversation and extract relevant appointment details. If no appointment details are found, set them as None. Use the provided tools to retain any necessary information about the appointment.'''
extractor = create_extractor(model, tools=[AppointmentDetails], tool_choice='AppointmentDetails')
result = extractor.invoke({
'messages': [SystemMessage(content = system_prompt)] + state['messages'],
'existing': { 'AppointmentDetails': existing_apntmnt_details}
})
updated_apntmnt_details = result['responses'][0].model_dump()
store.put(namespace, key, updated_apntmnt_details)
def update_conversations(state: MessagesState, store: BaseStore, config: RunnableConfig): user_id = config['configurable']['user_id']
namespace = ('conversations', user_id)
# conversations = store.search(namespace)
for index, msg in enumerate(state['messages']):
if not isinstance(msg, ToolMessage):
store.put(namespace, str(index + 1),
{'role': 'system' if isinstance(msg, AIMessage) else 'human', 'content': msg.content if msg.content else msg.tool_calls[0]['args']['reason']})
builder = StateGraph(MessagesState)
builder.add_node('scatter_schemas', scatter_schemas) builder.add_node('update_appointments', update_appointments) builder.add_node('update_conversations', update_conversations)
builder.add_conditional_edges( START, scatter_schemas, ["update_appointments", "update_conversations"] )
System Info
using the latest versions
What's happening is it's scheduling the ingestion for the future but o nthe same thread; so long as the scheduled run hasn't completed, the thread is still considered "pending"
@hinthornw in that case after 3 seconds I should see the memory created right? Also If i click on Join run then its creating a new fork.
Its been there for so long and If I proceed with another question/new turn then the a new fork is getting created for the my question. Now there will be 2 forks: 1. Current ongoing conversation 2. Once thread moves its pending state to complete.
I dont know whats happening. Could you please help me out with this. I provided the complete code.
is there an error in the langsmith trace
There is no error in the langsmith trace as well
You could try running my example please.