langgraph
langgraph copied to clipboard
Storm Example Falling with MultipleSubgraphsError
Checked other resources
- [X] I added a very descriptive title to this issue.
- [X] I searched the LangGraph/LangChain documentation with the integrated search.
- [X] I used the GitHub search to find a similar question and didn't find it.
- [X] I am sure that this is a bug in LangGraph/LangChain rather than my code.
- [X] I am sure this is better as an issue rather than a GitHub discussion, since this is a LangGraph bug and not a design question.
Example Code
async def conduct_interviews(state: ResearchState):
topic = state["topic"]
initial_states = [
{
"editor": editor,
"messages": [
AIMessage(
content=f"So you said you were writing an article on {topic}?",
name="Subject_Matter_Expert",
)
],
}
for editor in state["editors"]
]
# We call in to the sub-graph here to parallelize the interviews
interview_results = await interview_graph.abatch(initial_states)
return {
**state,
"interview_results": interview_results,
}
Error Message and Stack Trace (if applicable)
---------------------------------------------------------------------------
MultipleSubgraphsError Traceback (most recent call last)
Cell In[39], line 2
1 config = {"configurable": {"thread_id": "my-thread"}}
----> 2 async for step in storm.astream(
3 {
4 "topic": "Groq, NVIDIA, Llamma.cpp and the future of LLM Inference",
5 },
6 config,
7 ):
8 name = next(iter(step))
9 print(name)
File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langgraph/pregel/__init__.py:1502, in Pregel.astream(self, input, config, stream_mode, output_keys, interrupt_before, interrupt_after, debug, subgraphs)
1491 # Similarly to Bulk Synchronous Parallel / Pregel model
1492 # computation proceeds in steps, while there are channel updates
1493 # channel updates from step N are only visible in step N+1
1494 # channels are guaranteed to be immutable for the duration of the step,
1495 # with channel updates applied only at the transition between steps
1496 while loop.tick(
1497 input_keys=self.input_channels,
1498 interrupt_before=interrupt_before_,
1499 interrupt_after=interrupt_after_,
1500 manager=run_manager,
1501 ):
-> 1502 async for _ in runner.atick(
1503 loop.tasks.values(),
1504 timeout=self.step_timeout,
1505 retry_policy=self.retry_policy,
1506 get_waiter=get_waiter,
1507 ):
1508 # emit output
1509 for o in output():
1510 yield o
File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langgraph/pregel/runner.py:130, in PregelRunner.atick(self, tasks, reraise, timeout, retry_policy, get_waiter)
128 t = tasks[0]
129 try:
--> 130 await arun_with_retry(t, retry_policy, stream=self.use_astream)
131 self.commit(t, None)
132 except Exception as exc:
File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langgraph/pregel/retry.py:102, in arun_with_retry(task, retry_policy, stream)
100 pass
101 else:
--> 102 await task.proc.ainvoke(task.input, config)
103 # if successful, end
104 break
File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langgraph/utils/runnable.py:452, in RunnableSeq.ainvoke(self, input, config, **kwargs)
450 coro = step.ainvoke(input, config)
451 if ASYNCIO_ACCEPTS_CONTEXT:
--> 452 input = await asyncio.create_task(coro, context=context)
453 else:
454 input = await asyncio.create_task(coro)
File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langgraph/utils/runnable.py:235, in RunnableCallable.ainvoke(self, input, config, **kwargs)
233 if ASYNCIO_ACCEPTS_CONTEXT:
234 coro = cast(Coroutine[None, None, Any], self.afunc(input, **kwargs))
--> 235 ret = await asyncio.create_task(coro, context=context)
236 else:
237 ret = await self.afunc(input, **kwargs)
Cell In[36], line 33
20 initial_states = [
21 {
22 "editor": editor,
(...)
30 for editor in state["editors"]
31 ]
32 # We call in to the sub-graph here to parallelize the interviews
---> 33 interview_results = await interview_graph.abatch(initial_states)
35 return {
36 **state,
37 "interview_results": interview_results,
38 }
File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langchain_core/runnables/base.py:905, in Runnable.abatch(self, inputs, config, return_exceptions, **kwargs)
902 return await self.ainvoke(input, config, **kwargs)
904 coros = map(ainvoke, inputs, configs)
--> 905 return await gather_with_concurrency(configs[0].get("max_concurrency"), *coros)
File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langchain_core/runnables/utils.py:68, in gather_with_concurrency(n, *coros)
58 """Gather coroutines with a limit on the number of concurrent coroutines.
59
60 Args:
(...)
65 The results of the coroutines.
66 """
67 if n is None:
---> 68 return await asyncio.gather(*coros)
70 semaphore = asyncio.Semaphore(n)
72 return await asyncio.gather(*(gated_coro(semaphore, c) for c in coros))
File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langchain_core/runnables/base.py:902, in Runnable.abatch.<locals>.ainvoke(input, config)
900 return e
901 else:
--> 902 return await self.ainvoke(input, config, **kwargs)
File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langgraph/pregel/__init__.py:1613, in Pregel.ainvoke(self, input, config, stream_mode, output_keys, interrupt_before, interrupt_after, debug, **kwargs)
1611 else:
1612 chunks = []
-> 1613 async for chunk in self.astream(
1614 input,
1615 config,
1616 stream_mode=stream_mode,
1617 output_keys=output_keys,
1618 interrupt_before=interrupt_before,
1619 interrupt_after=interrupt_after,
1620 debug=debug,
1621 **kwargs,
1622 ):
1623 if stream_mode == "values":
1624 latest = chunk
File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langgraph/pregel/__init__.py:1464, in Pregel.astream(self, input, config, stream_mode, output_keys, interrupt_before, interrupt_after, debug, subgraphs)
1460 if "custom" in stream_modes:
1461 config[CONF][CONFIG_KEY_STREAM_WRITER] = lambda c: stream.put_nowait(
1462 ((), "custom", c)
1463 )
-> 1464 async with AsyncPregelLoop(
1465 input,
1466 stream=StreamProtocol(stream.put_nowait, stream_modes),
1467 config=config,
1468 store=store,
1469 checkpointer=checkpointer,
1470 nodes=self.nodes,
1471 specs=self.channels,
1472 output_keys=output_keys,
1473 stream_keys=self.stream_channels_asis,
1474 ) as loop:
1475 # create runner
1476 runner = PregelRunner(
1477 submit=loop.submit,
1478 put_writes=loop.put_writes,
1479 use_astream=do_stream is not None,
1480 )
1481 # enable subgraph streaming
File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langgraph/pregel/loop.py:789, in AsyncPregelLoop.__init__(self, input, stream, config, store, checkpointer, nodes, specs, output_keys, stream_keys, check_subgraphs, debug)
774 def __init__(
775 self,
776 input: Optional[Any],
(...)
787 debug: bool = False,
788 ) -> None:
--> 789 super().__init__(
790 input,
791 stream=stream,
792 config=config,
793 checkpointer=checkpointer,
794 store=store,
795 nodes=nodes,
796 specs=specs,
797 output_keys=output_keys,
798 stream_keys=stream_keys,
799 check_subgraphs=check_subgraphs,
800 debug=debug,
801 )
802 self.stack = AsyncExitStack()
803 if checkpointer:
File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langgraph/pregel/loop.py:230, in PregelLoop.__init__(self, input, stream, config, store, checkpointer, nodes, specs, output_keys, stream_keys, check_subgraphs, debug)
228 if check_subgraphs and self.is_nested and self.checkpointer is not None:
229 if self.config[CONF][CONFIG_KEY_CHECKPOINT_NS] in _SEEN_CHECKPOINT_NS:
--> 230 raise MultipleSubgraphsError
231 else:
232 _SEEN_CHECKPOINT_NS.add(self.config[CONF][CONFIG_KEY_CHECKPOINT_NS])
MultipleSubgraphsError:
Description
When running the storm demo on that latest versions of langraph the demo is failing now when during the "interviews"
System Info
System Information
OS: Darwin OS Version: Darwin Kernel Version 23.5.0: Wed May 1 20:14:38 PDT 2024; root:xnu-10063.121.3~5/RELEASE_ARM64_T6020 Python Version: 3.11.9 (v3.11.9:de54cf5be3, Apr 2 2024, 07:12:50) [Clang 13.0.0 (clang-1300.0.29.30)]
Package Information
langchain_core: 0.3.8 langchain: 0.3.2 langchain_community: 0.3.1 langsmith: 0.1.131 langchain_chroma: 0.1.2 langchain_fireworks: 0.2.0 langchain_ibm: 0.2.1 langchain_ollama: 0.2.0 langchain_openai: 0.2.1 langchain_text_splitters: 0.3.0 langgraph: 0.2.34
Optional packages not installed
langserve
Other Dependencies
aiohttp: 3.10.8 async-timeout: Installed. No version info available. chromadb: 0.5.11 dataclasses-json: 0.6.7 fastapi: 0.110.3 fireworks-ai: 0.15.4 httpx: 0.27.2 ibm-watsonx-ai: 1.1.11 jsonpatch: 1.33 langgraph-checkpoint: 2.0.0 numpy: 1.26.4 ollama: 0.3.3 openai: 1.51.0 orjson: 3.10.7 packaging: 24.1 pydantic: 2.9.2 pydantic-settings: 2.5.2 PyYAML: 6.0.2 requests: 2.32.3 requests-toolbelt: 1.0.0 SQLAlchemy: 2.0.35 tenacity: 8.5.0 tiktoken: 0.7.0 typing-extensions: 4.12.2