langgraph icon indicating copy to clipboard operation
langgraph copied to clipboard

Storm Example Falling with MultipleSubgraphsError

Open deanchanter opened this issue 4 months ago • 3 comments

Checked other resources

  • [X] I added a very descriptive title to this issue.
  • [X] I searched the LangGraph/LangChain documentation with the integrated search.
  • [X] I used the GitHub search to find a similar question and didn't find it.
  • [X] I am sure that this is a bug in LangGraph/LangChain rather than my code.
  • [X] I am sure this is better as an issue rather than a GitHub discussion, since this is a LangGraph bug and not a design question.

Example Code

async def conduct_interviews(state: ResearchState):
    topic = state["topic"]
    initial_states = [
        {
            "editor": editor,
            "messages": [
                AIMessage(
                    content=f"So you said you were writing an article on {topic}?",
                    name="Subject_Matter_Expert",
                )
            ],
        }
        for editor in state["editors"]
    ]
    # We call in to the sub-graph here to parallelize the interviews
    interview_results = await interview_graph.abatch(initial_states)

    return {
        **state,
        "interview_results": interview_results,
    }

Error Message and Stack Trace (if applicable)

---------------------------------------------------------------------------
MultipleSubgraphsError                    Traceback (most recent call last)
Cell In[39], line 2
      1 config = {"configurable": {"thread_id": "my-thread"}}
----> 2 async for step in storm.astream(
      3     {
      4         "topic": "Groq, NVIDIA, Llamma.cpp and the future of LLM Inference",
      5     },
      6     config,
      7 ):
      8     name = next(iter(step))
      9     print(name)

File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langgraph/pregel/__init__.py:1502, in Pregel.astream(self, input, config, stream_mode, output_keys, interrupt_before, interrupt_after, debug, subgraphs)
   1491 # Similarly to Bulk Synchronous Parallel / Pregel model
   1492 # computation proceeds in steps, while there are channel updates
   1493 # channel updates from step N are only visible in step N+1
   1494 # channels are guaranteed to be immutable for the duration of the step,
   1495 # with channel updates applied only at the transition between steps
   1496 while loop.tick(
   1497     input_keys=self.input_channels,
   1498     interrupt_before=interrupt_before_,
   1499     interrupt_after=interrupt_after_,
   1500     manager=run_manager,
   1501 ):
-> 1502     async for _ in runner.atick(
   1503         loop.tasks.values(),
   1504         timeout=self.step_timeout,
   1505         retry_policy=self.retry_policy,
   1506         get_waiter=get_waiter,
   1507     ):
   1508         # emit output
   1509         for o in output():
   1510             yield o

File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langgraph/pregel/runner.py:130, in PregelRunner.atick(self, tasks, reraise, timeout, retry_policy, get_waiter)
    128 t = tasks[0]
    129 try:
--> 130     await arun_with_retry(t, retry_policy, stream=self.use_astream)
    131     self.commit(t, None)
    132 except Exception as exc:

File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langgraph/pregel/retry.py:102, in arun_with_retry(task, retry_policy, stream)
    100         pass
    101 else:
--> 102     await task.proc.ainvoke(task.input, config)
    103 # if successful, end
    104 break

File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langgraph/utils/runnable.py:452, in RunnableSeq.ainvoke(self, input, config, **kwargs)
    450     coro = step.ainvoke(input, config)
    451 if ASYNCIO_ACCEPTS_CONTEXT:
--> 452     input = await asyncio.create_task(coro, context=context)
    453 else:
    454     input = await asyncio.create_task(coro)

File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langgraph/utils/runnable.py:235, in RunnableCallable.ainvoke(self, input, config, **kwargs)
    233 if ASYNCIO_ACCEPTS_CONTEXT:
    234     coro = cast(Coroutine[None, None, Any], self.afunc(input, **kwargs))
--> 235     ret = await asyncio.create_task(coro, context=context)
    236 else:
    237     ret = await self.afunc(input, **kwargs)

Cell In[36], line 33
     20 initial_states = [
     21     {
     22         "editor": editor,
   (...)
     30     for editor in state["editors"]
     31 ]
     32 # We call in to the sub-graph here to parallelize the interviews
---> 33 interview_results = await interview_graph.abatch(initial_states)
     35 return {
     36     **state,
     37     "interview_results": interview_results,
     38 }

File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langchain_core/runnables/base.py:905, in Runnable.abatch(self, inputs, config, return_exceptions, **kwargs)
    902         return await self.ainvoke(input, config, **kwargs)
    904 coros = map(ainvoke, inputs, configs)
--> 905 return await gather_with_concurrency(configs[0].get("max_concurrency"), *coros)

File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langchain_core/runnables/utils.py:68, in gather_with_concurrency(n, *coros)
     58 """Gather coroutines with a limit on the number of concurrent coroutines.
     59 
     60 Args:
   (...)
     65     The results of the coroutines.
     66 """
     67 if n is None:
---> 68     return await asyncio.gather(*coros)
     70 semaphore = asyncio.Semaphore(n)
     72 return await asyncio.gather(*(gated_coro(semaphore, c) for c in coros))

File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langchain_core/runnables/base.py:902, in Runnable.abatch.<locals>.ainvoke(input, config)
    900         return e
    901 else:
--> 902     return await self.ainvoke(input, config, **kwargs)

File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langgraph/pregel/__init__.py:1613, in Pregel.ainvoke(self, input, config, stream_mode, output_keys, interrupt_before, interrupt_after, debug, **kwargs)
   1611 else:
   1612     chunks = []
-> 1613 async for chunk in self.astream(
   1614     input,
   1615     config,
   1616     stream_mode=stream_mode,
   1617     output_keys=output_keys,
   1618     interrupt_before=interrupt_before,
   1619     interrupt_after=interrupt_after,
   1620     debug=debug,
   1621     **kwargs,
   1622 ):
   1623     if stream_mode == "values":
   1624         latest = chunk

File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langgraph/pregel/__init__.py:1464, in Pregel.astream(self, input, config, stream_mode, output_keys, interrupt_before, interrupt_after, debug, subgraphs)
   1460 if "custom" in stream_modes:
   1461     config[CONF][CONFIG_KEY_STREAM_WRITER] = lambda c: stream.put_nowait(
   1462         ((), "custom", c)
   1463     )
-> 1464 async with AsyncPregelLoop(
   1465     input,
   1466     stream=StreamProtocol(stream.put_nowait, stream_modes),
   1467     config=config,
   1468     store=store,
   1469     checkpointer=checkpointer,
   1470     nodes=self.nodes,
   1471     specs=self.channels,
   1472     output_keys=output_keys,
   1473     stream_keys=self.stream_channels_asis,
   1474 ) as loop:
   1475     # create runner
   1476     runner = PregelRunner(
   1477         submit=loop.submit,
   1478         put_writes=loop.put_writes,
   1479         use_astream=do_stream is not None,
   1480     )
   1481     # enable subgraph streaming

File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langgraph/pregel/loop.py:789, in AsyncPregelLoop.__init__(self, input, stream, config, store, checkpointer, nodes, specs, output_keys, stream_keys, check_subgraphs, debug)
    774 def __init__(
    775     self,
    776     input: Optional[Any],
   (...)
    787     debug: bool = False,
    788 ) -> None:
--> 789     super().__init__(
    790         input,
    791         stream=stream,
    792         config=config,
    793         checkpointer=checkpointer,
    794         store=store,
    795         nodes=nodes,
    796         specs=specs,
    797         output_keys=output_keys,
    798         stream_keys=stream_keys,
    799         check_subgraphs=check_subgraphs,
    800         debug=debug,
    801     )
    802     self.stack = AsyncExitStack()
    803     if checkpointer:

File ~/Library/Caches/pypoetry/virtualenvs/project-mercury-Rog6NQKe-py3.11/lib/python3.11/site-packages/langgraph/pregel/loop.py:230, in PregelLoop.__init__(self, input, stream, config, store, checkpointer, nodes, specs, output_keys, stream_keys, check_subgraphs, debug)
    228 if check_subgraphs and self.is_nested and self.checkpointer is not None:
    229     if self.config[CONF][CONFIG_KEY_CHECKPOINT_NS] in _SEEN_CHECKPOINT_NS:
--> 230         raise MultipleSubgraphsError
    231     else:
    232         _SEEN_CHECKPOINT_NS.add(self.config[CONF][CONFIG_KEY_CHECKPOINT_NS])

MultipleSubgraphsError:

Description

When running the storm demo on that latest versions of langraph the demo is failing now when during the "interviews"

System Info

System Information

OS: Darwin OS Version: Darwin Kernel Version 23.5.0: Wed May 1 20:14:38 PDT 2024; root:xnu-10063.121.3~5/RELEASE_ARM64_T6020 Python Version: 3.11.9 (v3.11.9:de54cf5be3, Apr 2 2024, 07:12:50) [Clang 13.0.0 (clang-1300.0.29.30)]

Package Information

langchain_core: 0.3.8 langchain: 0.3.2 langchain_community: 0.3.1 langsmith: 0.1.131 langchain_chroma: 0.1.2 langchain_fireworks: 0.2.0 langchain_ibm: 0.2.1 langchain_ollama: 0.2.0 langchain_openai: 0.2.1 langchain_text_splitters: 0.3.0 langgraph: 0.2.34

Optional packages not installed

langserve

Other Dependencies

aiohttp: 3.10.8 async-timeout: Installed. No version info available. chromadb: 0.5.11 dataclasses-json: 0.6.7 fastapi: 0.110.3 fireworks-ai: 0.15.4 httpx: 0.27.2 ibm-watsonx-ai: 1.1.11 jsonpatch: 1.33 langgraph-checkpoint: 2.0.0 numpy: 1.26.4 ollama: 0.3.3 openai: 1.51.0 orjson: 3.10.7 packaging: 24.1 pydantic: 2.9.2 pydantic-settings: 2.5.2 PyYAML: 6.0.2 requests: 2.32.3 requests-toolbelt: 1.0.0 SQLAlchemy: 2.0.35 tenacity: 8.5.0 tiktoken: 0.7.0 typing-extensions: 4.12.2

deanchanter avatar Oct 04 '24 12:10 deanchanter