langgraph icon indicating copy to clipboard operation
langgraph copied to clipboard

Async tools do not currently support 'custom' events streaming via get_stream_writer().

Open bharathiselvan opened this issue 1 month ago • 1 comments

Checked other resources

  • [x] This is a bug, not a usage question. For questions, please use the LangChain Forum (https://forum.langchain.com/).
  • [x] I added a clear and detailed title that summarizes the issue.
  • [x] I read what a minimal reproducible example is (https://stackoverflow.com/help/minimal-reproducible-example).
  • [x] I included a self-contained, minimal example that demonstrates the issue INCLUDING all the relevant imports. The code run AS IS to reproduce the issue.

Example Code

from langchain_core.tools import tool
from langgraph.config import get_stream_writer

@tool
async def test_async_tool():
    writer = get_stream_writer()
    print("Writer:", writer)
    writer("Hello from async tool")
    return "done"

Error Message and Stack Trace (if applicable)


Description

When running astream in custom stream mode, the emitted message is not captured. This behavior only occurs with async tools; sync tools stream messages correctly.

System Info

System Information

OS: Windows OS Version: 10.0.26100 Python Version: 3.13.7 (tags/v3.13.7:bcee1c3, Aug 14 2025, 14:15:11) [MSC v.1944 64 bit (AMD64)]

Package Information

langchain_core: 1.0.3 langchain: 1.0.3 langsmith: 0.4.41 langchain_openai: 1.0.2 langgraph_sdk: 0.2.9

Optional packages not installed

langserve

Other Dependencies

claude-agent-sdk: Installed. No version info available. httpx: 0.28.1 jsonpatch: 1.33 langchain-anthropic: Installed. No version info available. langchain-aws: Installed. No version info available. langchain-community: Installed. No version info available. langchain-deepseek: Installed. No version info available. langchain-fireworks: Installed. No version info available. langchain-google-genai: Installed. No version info available. langchain-google-vertexai: Installed. No version info available. langchain-groq: Installed. No version info available. langchain-huggingface: Installed. No version info available. langchain-mistralai: Installed. No version info available. langchain-ollama: Installed. No version info available. langchain-perplexity: Installed. No version info available. langchain-together: Installed. No version info available. langchain-xai: Installed. No version info available. langgraph: 1.0.2 langsmith-pyo3: Installed. No version info available. openai: 2.7.1 openai-agents: Installed. No version info available. opentelemetry-api: 1.38.0 opentelemetry-exporter-otlp-proto-http: 1.38.0 opentelemetry-sdk: 1.38.0 orjson: 3.11.4 packaging: 25.0 pydantic: 2.12.4 pytest: Installed. No version info available. pyyaml: 6.0.3 requests: 2.32.5 requests-toolbelt: 1.0.0 rich: Installed. No version info available. tenacity: 9.1.2 tiktoken: 0.12.0 typing-extensions: 4.15.0 vcrpy: Installed. No version info available. zstandard: 0.25.0

bharathiselvan avatar Nov 14 '25 13:11 bharathiselvan

Hi @bharathiselvan,

I tested your example code and it appears to be working correctly on my setup. Here's what I found:

import asyncio
from langchain_core.tools import tool
from langgraph.config import get_stream_writer
from langgraph.graph import StateGraph, END
from typing import TypedDict


class GraphState(TypedDict):
    result: str


@tool
async def test_async_tool():
    """say hello"""
    writer = get_stream_writer()
    print("Writer:", writer)
    writer("Hello from async tool")
    return "done"


async def test_node(state: GraphState):
    result = await test_async_tool.ainvoke({})
    return {"result": result}


graph_builder = StateGraph(GraphState)

graph_builder.add_node("test_node", test_node)

graph_builder.set_entry_point("test_node")

graph_builder.add_edge("test_node", END)

graph = graph_builder.compile()


async def main():
    async for chunk in graph.astream({"result": ""}, stream_mode="custom"):
        print("Stream chunk", chunk)


if __name__ == "__main__":
    asyncio.run(main())

Georgedyatmsk1405 avatar Nov 14 '25 18:11 Georgedyatmsk1405

Hi @bharathiselvan,

I investigated this issue. Like @Georgedyatmsk1405, I wasn't able to reproduce the failure in a clean environment (async tools seemed to stream correctly for me).

However, while digging through the Pregel.astream implementation, I noticed a shadowed stream_writer definition that was potentially blocking correct context propagation in certain edge cases.

I have opened a PR (linked above) that:

  1. Removes that shadowed/redundant code.
  2. Adds a dedicated test case for ToolNode + get_stream_writer to ensure this path is covered.

If you have a moment, could you test if my branch resolves the issue in your specific environment?

dumko2001 avatar Nov 27 '25 06:11 dumko2001

Thank you .. I will check and let you know..

bharathiselvan avatar Nov 27 '25 09:11 bharathiselvan

Hi @Georgedyatmsk1405 I encountered the same issue.

    "langchain-core ~=1.1.0",
    "langchain-community ~=0.4.1",
    "langchain-ollama ~=1.0.0",
    "langchain-openai ~=1.0.2",
    "langgraph~=1.0.4",
    "langgraph-checkpoint-postgres ~= 3.0.1",
    "langgraph-prebuilt ~= 1.0.5",

This is my test code.

import asyncio
from typing import TypedDict

from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langgraph.config import get_stream_writer
from langgraph.graph import END, StateGraph

BASE_URL = "YOUR_BASE_URL"
MODEL_NAME = "YOUR_MODEL_NAME"
API_KEY = "YOUR_KEY"


class GraphState(TypedDict):
    query: str
    answer: str


async def llm_node(state: GraphState):
    writer = get_stream_writer()
    query = state["query"]

    llm = ChatOpenAI(
        model=MODEL_NAME,
        temperature=0.7,
        streaming=True,
        base_url=BASE_URL,
        api_key=API_KEY,
    )

    print(f"--- Node:  LLM answer: '{query}' ---")

    writer({"type": "status", "data": "connecting_to_llm"})

    full_content = ""

    async for chunk in llm.astream([HumanMessage(content=query)]):
        content = chunk.content
        if content:
            full_content += content
            writer({"type": "token", "data": content})

    return {"answer": full_content}


workflow = StateGraph(GraphState)
workflow.add_node("chatbot", llm_node)
workflow.set_entry_point("chatbot")
workflow.add_edge("chatbot", END)
graph = workflow.compile()


async def main():
    inputs = {"query": "tell a joke."}
    async for stream_chunk in graph.astream(inputs, stream_mode="custom"):
        if stream_chunk.get("type") == "token":
            print(stream_chunk["data"], end="|", flush=True)

        elif stream_chunk.get("type") == "status":
            print(f"\n[status]: {stream_chunk['data']}\n")

    print(f"\n{'-' * 30}\nfinished")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as e:
        print(f"\nerror: {e}")

knysfh avatar Nov 28 '25 09:11 knysfh