Master issue: [Streaming Tools] support streaming intermediate results for tools for non-streaming case
** Please make sure you read the contribution guide and file the issues in the right place. ** Contribution guide.
Is your feature request related to a problem? Please describe. A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
Describe the solution you'd like A clear and concise description of what you want to happen.
Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.
Additional context Add any other context or screenshots about the feature request here.
I have the following use case:
- I have a single tool that runs for 2 minutes which does a series of steps (each step can take 2 to 30 seconds)
- The user interface waits on the tool for 2 minutes and the user does not know if the tool is frozen
- I would like to provide progress updates during the long running execution
Example output from tool:
- Creating image prompt
- Creating the image
- Scanning the image for policy violations... (maybe this takes 30 seconds)
- Done scanning image: No policy violations found
- Adding watermark to image
- Checking image into source control
- Updating readme
- { returns }
The user would see the outputted status and the final LLM output.
Pseudo code for tool. "raise_status_message" would send back statuses, to the UI via the REST SSE, during the tool execution.
def image_curation_process(prompt: str)-> dict:
"""
Creates an image, scans it and checks it into source control.
Returns:
dict: A dictionary containing the status and a boolean result.
{
"status": "success" or "failed",
"tool_name": "image_curation_process",
"generated_image_url" : "generated_image_url"
}
}
"""
generate_enhanced_prompt_result = generate_enhanced_prompt(prompt)
raise_status_message("Enhanced prompt generated")
generate_image_result = generate_image(generate_enhanced_prompt_result["enhanced_prompt"]"])
raise_status_message("Image generated")
raise_status_message("Starting image scan...")
scan_image_result = scan_image(generate_image_result["generated_image_url"])
raise_status_message("Image scanned")
if scan_image_result == False:
return { "status": "failed", "tool_name" : "image_curation_process", "messages": ["Image voliated policy 123."] }
add_watermark_result = add_watermark(generate_image_result["generated_image_url"])
raise_status_message("Watermark added")
check_into_source_control_result = check_into_source_control(add_watermark_result["generated_image_url"])
raise_status_message("Image checked into source control")
update_readme_result = update_readme(check_into_source_control_result["generated_image_url"])
raise_status_message("Readme updated")
return { "status": "success", "tool_name" : "create_image_tool", "generated_image_url" : generate_image_result["generated_image_url"] }
image_agent = LlmAgent(name="ImageAgent",
description="Provide the abilty to generate image, verify corporate polcies on the image and check into source control.",
tools=[ image_curation_process],
model="gemini-2.5-flash")
Any updates on the issue?
Also very interested in this functionality. Have a requirement to support streaming intermediate tool results for non-Live models in my current application. Any word on whether the proposed PRs are viable approaches to enable this functionality?
@ChristopherEeles In our application my PR function is running well. Is there any things I can help you?
I ended up writing this where a tool can yield events to the UI: https://github.com/GoogleCloudPlatform/data-analytics-golden-demo/blob/main/data-analytics-demos/data-analytics-agent/agent-code/data_analytics_agent/dataplex/data_profile_workflow/data_profile_tool_workflow.py
Hi, I hope this is the right place.
I'm using streaming tools to make an Agent Tool asynchronous, since otherwise the voice agent will freeze while the agent tool is doing its thing.
This is how I implemented it:
def make_agent_tool_async(agent_tool: AgentTool):
"""Create a streaming tool that wraps an LlmAgent via AgentTool.
The returned function:
- yields exactly once with {"status": "started"}
- then waits for the agent to finish
- then yields {"status": "done", "reply": <agent reply>}
"""
async def execute_async_agent_tool(
request: str,
tool_context: ToolContext,
) -> AsyncGenerator[dict[str, Any], None]:
pending = tool_context.state.get("pending_jobs", {})
if agent_tool.agent.name in pending:
yield {
"status": "already running - wait for completion - do not call again until done",
}
log.debug(
f"Agent tool {agent_tool.agent.name} already running"
)
return
pending[agent_tool.agent.name] = "started"
tool_context.state["pending_jobs"] = pending
yield {
"status": "started",
}
log.debug(
f"Started long-running agent tool for agent_name={agent_tool.agent.name}"
)
reply = await agent_tool.run_async(
args={"request": request},
tool_context=tool_context,
)
yield {
"status": "done",
"reply": reply,
}
tool_context.state["pending_jobs"].pop(agent_tool.agent.name, None)
log.debug(f"Completed long-running agent tool for agent_name={agent_tool.agent.name}")
execute_async_agent_tool.__name__ = agent_tool.name
# TODO maybe use _get_declaration to copy over function signature?
base_desc = agent_tool.agent.description or ""
execute_async_agent_tool.__doc__ = base_desc + (
"\n\nNOTE: This is a long-running streaming version of this tool. "
"It first returns status='started', then later returns status='done' with the reply. "
"If called while already running, returns status='already running'. "
"Wait for status='done' before calling this tool again."
)
return execute_async_agent_tool
voice_agent= LlmAgent(
name="voice_agent",
model=voice_model,
instruction=....,
tools=[
make_agent_tool_async(
AgentTool(
text_agent,
skip_summarization=True,
)
),
],
)
Is there a better way to do this?
With some prompting on the voice agent side this sort of works.
I also have a question: I do not see the reply in the events, but the model sees it. How are those streaming results treated? Why don’t they generate a function response event?