[Live] Multiple responses after agent transfer and repeat response on session resumption
Issues
Noticed two issues when running Vertex AI Live agents in with both Audio input and text input (audio out only).
1.) Using an architecture with a parent agent and sub agent where both have a callback context tool. Whenever the user transfers to an agent, if they then call a tool, the agent will respond N + 1 times where N is the number of transfers that have happened in the current conversation (e.g., root_agent -> sub_agent -> root_agent -> tool_call: Will respond 3 times). Depending on latency, the audio will fully play fully or the latter responses will cut off the audio of the earlier ones. The transcription will always output all of the responses in full.
2.) When reconnecting using SessionResumption, if an agent transfer has taken place, only the conversation history up to the agent transfer is sent. Since this is a user turn, the agent will then respond answering the query that caused the agent transfer. This only happens with SessionResumption and does not happen when manually resetting the session. Still happens after transferring back to the root agent.
Not sure if these are both Gemini model issues or ADK issues.
Code
Basic Code For replication is here. Would need to be hooked up to a front end. Running through routes that have been added to ADKs get_fast_api_app(). All built off the SSE example available on the ADK docs website (or was it looks to have been taken down now).
Agents
def get_root_agent():
return LlmAgent(
name="root_agent",
model= "gemini-live-2.5-flash", # also: "gemini-live-2.5-flash-preview-native-audio-09-2025"
description="The root agent",
instruction="deafault instruction",
tools=[get_math_tool()]
)
def get_sub_agent():
return LlmAgent(
name="sub_agent",
model= "gemini-live-2.5-flash", # also: "gemini-live-2.5-flash-preview-native-audio-09-2025"
description="Used whenever the user says they want to talk to the sub agent.",
instruction="default instruction",
tools=[get_math_tool()]
)
def get_agent():
root_agent = get_root_agent()
sub_agent = get_sub_agent()
root_agent.sub_agents = [sub_agent]
sub_agent.parent_agent = root_agent
return root_agent
Tool
def get_math_tool():
return FunctionTool(func=solve_math)
def solve_math(
tool_context: Optional[ToolContext] = None,
) -> int:
return 2 + 2
def get_description():
return "This tool is uesed to get the answer to the math problem"
# Set the docstring for the function (required by ADK)
solve_math.__doc__ = get_description()
NAME = solve_math.__name__
Setup
async def start_agent_session(
app_name: str, user_id: str, session_id: str,
) -> Tuple[AsyncGenerator, LiveRequestQueue]:
agent = get_agent()
session_service = DatabaseSessionService("sqlite:///./sessions.db")
session = await session_service.get_session(
app_name=app_name, user_id=user_id, session_id=session_id
)
runner = BiDiRunner(
app_name=app_name,
agent=agent,
session_service=session_service,
)
run_config = RunConfig(
streaming_mode=StreamingMode.BIDI,
response_modalities=[types.Modality.AUDIO],
speech_config=types.SpeechConfig(
voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=VOICE_NAME)
)
),
output_audio_transcription=types.AudioTranscriptionConfig(),
input_audio_transcription=types.AudioTranscriptionConfig(),
session_resumption=types.SessionResumptionConfig(),
save_live_audio=False,
)
live_request_queue = LiveRequestQueue()
live_events = runner.run_live(
session=session,
live_request_queue=live_request_queue,
run_config=run_config,
)
return live_events, live_request_queue
Communication
async def agent_to_client_sse(
live_events: AsyncGenerator, current_session_id: str
) -> AsyncGenerator[str, None]:
try:
async for event in live_events:
# Check for turn completion or interruption
if event.turn_complete or event.interrupted:
if event.interrupted:
message = {
"type": "interrupted",
"data": "Response interrupted by user input",
}
yield f"data: {json.dumps(message)}\n\n"
if event.turn_complete:
message = {
"type": "turn_complete",
"session_id": current_session_id,
}
yield f"data: {json.dumps(message)}\n\n"
continue
if (
hasattr(event, "session_resumption_update")
and event.session_resumption_update
):
update = event.session_resumption_update
if update.resumable and update.new_handle:
current_session_id = update.new_handle
message = {"type": "session_id", "data": current_session_id}
yield f"data: {json.dumps(message)}\n\n"
# Handle content
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, "inline_data") and part.inline_data:
if part.inline_data.mime_type.startswith("audio/pcm"):
audio_data = part.inline_data.data
if audio_data:
message = {
"type": "audio",
"data": base64.b64encode(audio_data).decode(
"ascii"
),
}
yield f"data: {json.dumps(message)}\n\n"
continue
if event.output_transcription:
output_texts.append(event.output_transcription.text)
message = {
"type": "text",
"role": "model",
"data": event.output_transcription.text,
}
yield f"data: {json.dumps(message)}\n\n"
if event.input_transcription:
message = {
"type": "text",
"role": "user",
"data": event.input_transcription.text,
}
yield f"data: {json.dumps(message)}\n\n"
input_texts.append(event.input_transcription.text)
except Exception as e:
import traceback
traceback.print_exc()
error_message = {"type": "error", "data": f"Stream error: {str(e)}"}
yield f"data: {json.dumps(error_message)}\n\n"
async def process_client_message(
message_data: Dict[str, Any], live_request_queue: LiveRequestQueue, session_id: str
) -> bool:
try:
msg_type = message_data.get("type", "")
if msg_type == "audio":
data_b64 = message_data.get("data", "")
if not data_b64:
logger.warning("Empty audio payload; dropping")
return False
try:
audio_bytes = base64.b64decode(data_b64, validate=True)
except Exception as de:
logger.warning(f"Invalid base64 audio payload: {de}; dropping")
return False
if len(audio_bytes) == 0 or (len(audio_bytes) % 2) != 0:
return False
live_request_queue.send_realtime(
types.Blob(
data=audio_bytes,
mime_type=f"audio/pcm;rate=16000",
)
)
return True
elif msg_type == "text" or message_data.get("mime_type") == "text/plain":
text_content = message_data.get("data", "").strip()
if text_content:
content = types.Content(
role="user",
parts=[types.Part.from_text(text=text_content)],
)
live_request_queue.send_content(content=content)
else:
return False
except Exception as e:
import traceback
traceback.print_exc()
return False
Routes
@app.get(
path="/apps/{app_name}/users/{user_id}/sessions/{session_id}/events",
)
async def sse_endpoint(
app_name: str, user_id: str, session_id: str, is_audio: str = "false"
):
try:
live_events, live_request_queue = await start_agent_session(
app_name, user_id, session_id, is_audio=(is_audio.lower() == "true")
)
active_sessions[user_id + session_id] = live_request_queue
logger.info(f"Client #{user_id} connected via SSE, audio mode: {is_audio}")
def cleanup():
active_id = user_id + session_id
try:
live_request_queue.close()
if active_id in active_sessions:
del active_sessions[active_id]
logger.info(f"Client #{active_id} disconnected from SSE")
except Exception as e:
logger.error(f"Error cleaning up session for {active_id}: {e}")
async def event_generator():
session_info = {
"type": "session_id",
"data": session_id,
}
session_id_message = json.dumps(session_info)
yield f"data: {session_id_message}\n\n"
try:
async for data in agent_to_client_sse(live_events, session_id):
yield data
except Exception as e:
logger.error(
f"Error in SSE stream for userId: {user_id}, session_id: {session_id} : {e}"
)
finally:
cleanup()
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Cache-Control",
},
)
except Exception as e:
logger.error(f"Failed to create SSE session for {user_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to create session")
@app.post(
path="/apps/{app_name}/users/{user_id}/sessions/{session_id}/send",
)
async def send_message_endpoint(
app_name: str, user_id: str, session_id: str, request: Request
):
try:
live_request_queue = active_sessions.get(user_id + session_id)
if not live_request_queue:
raise HTTPException(status_code=404, detail="Session not found")
message = await request.json()
if message.get("type") == "end":
live_request_queue.close()
return {"status": "session ended"}
success = await process_client_message(
message, live_request_queue, session_id
)
if not success:
raise HTTPException(status_code=400, detail="Failed to process message")
return {"status": "sent"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error processing message for {user_id}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
To Reproduce
Issue 1:
- Transfer between two bidi agents using the gemini models above through Vertex AI API
- Get the agent to call a function
- See multiple texts returned.
Issue 2:
- Transfer between two bidi agents using the gemini models above through Vertex AI API
- Send other messages to the ub agent if you like.
- Wait for session resumption
- Hear agent respond without prompt.
- Print the history of llm_connection.send_history() - (Line 135 of base_llm_flow.py) - during session resumption to see last call is "user" and an agent transfer.
Desktop (please complete the following information):
- OS: Windows (WSL Dev Container)
- Python version(python -V): 3.13.7
- ADK version(pip show google-adk): 1.17.0
Model Information:
- Which model is being used: "gemini-live-2.5-flash" or "gemini-live-2.5-flash-preview-native-audio-09-2025"
For #1, could you provide what's the expected behaviour with example.
For #2, could you explain what's the expected behaviour?
@hangfei sure!
For expected behavior:
- When making a tool call patterns like so:
- root_agent -> tool_call
- root_agent -> sub_agent -> tool_call
- root_agent -> sub_agent -> root_agent -> tool_call
I expect them all to return a single response on the tool call turn instead of 1, 2 and 3 responses respectively. In my code example with the super simple math problem when I ask the agent to "solve the math problem" I get respective responses to the examples above like:
- "The answer is 4."
- "The answer is 4. The solution is 4."
- "The answer is 4. The solution is 4. The solution to the math problem is 4."
The wording is almost always different indicating that the model is returning multiple responses for a single query.
- I expect on session resumption for the full history to be sent to the agent not just up to the most recent agent transfer. I also expect the agent not to respond automatically after every session resumption.
eg., Current issue I am seeing:
[Agent is starting as the root agent]
User: "Transfer me to the Sub agent"
Sub_agent: "This is the sub agent, what would you like to talk about?"
... many more turns take place while talking to only the sub agent. Agent has responded every time and is currently waiting for a user input to continue...
[Session resumption takes place due to reaching 10 min]
Sub_agent: "This is the sub agent, how can I help you?"
In the above example, the agent is responding unprompted on session resumption to a state held much earlier in the conversation.
for 1, you can't do this: root_agent.sub_agents = [sub_agent] sub_agent.parent_agent = root_agent
You can try this similar example and it should work: https://github.com/google/adk-python/tree/main/contributing/samples/live_bidi_streaming_multi_agent
For 2, which api variant(GenAI or Vertex) are you using?
Do you mean the model doesn't have context on previous conversation or the model doesn't respond actively right after session resumption?
For 1, by not being able to do this do you mean this parent relation already defined when the sub agent is set? Or do you mean it should throw an error? I will try removing and see if that fixes the issue I am seeing.
For 2, I am using Vertex AI. What I mean is that upon session resumption, the model's history is refreshed to the point of last agent transfer. Following this it responds again without prompting.
If it helps, I can print out an example session history before and after session resumption.
For 1, you don't/cant' set parent. Check out the example i provided and modify to your use case.
For 2, it seems to be a model issue. But let's see your history data first. I will try to reproduce it on my end.
Ok, here are my results:
For problem 1. I removed the explicit setting of the parent relation but unfortunately it did not change anything except make it harder to route back to the parent agent. I tested the example you sent in both adk web and through the get_fast_api() and the model is still responding multiple times (or at least making multiple transcriptions). For example, when asking to roll a die and printing out the transcriptions, I see responses like:
"You rolled a 5. I rolled a 6 sided die and rolled a 5."
Then if I transfer back and ask for the weather in London I see responses like:
"It is raining in London. It is raining in London. It's raining in London"
Is i possible the location where I am applying the transcription is capturing some partial thought from the model? It seems odd that it increases the number of transcription responses after every agent transfer.
For part 2: To start, I added the following code in base_llm_flow.py to get the input and output transcriptions back in the SQLite DB.
async def _update_transcription_contents(self, event: Event):
if event.input_transcription:
event.content = types.Content(
parts=[types.Part(text=event.input_transcription.text)], role="user"
)
elif event.output_transcription:
event.content = types.Content(
parts=[types.Part(text=event.output_transcription.text)], role="model"
)
return event
I call this at line 159 of run_live() in base_llm_flow.py:
I added print statements in call loop (seen in the photo above) as well as in the llm_connection.send_history(llm_request.contents) function of gemini_llm_connection.py. From this, I found that llm_request.contents is only ever updated on agent transfer. When printing it, it will always be an empty list until an agent transfer is called - after which it is updated to the last point of agent transfer. For my example below, this is what it looks like when printed out after agent transfer and after session resumption:
[Content(
parts=[
Part(
text='Hello'
),
],
role='user'
), Content(
parts=[
Part(
text='.'
),
],
role='user'
), Content(
parts=[
Part(
text='For context:'
),
Part(
text='[root_agent] said: Hi there. How can I'
),
],
role='user'
), Content(
parts=[
Part(
text='For context:'
),
Part(
text='[root_agent] said: help you today.'
),
],
role='user'
), Content(
parts=[
Part(
text='Take'
),
],
role='user'
), Content(
parts=[
Part(
text=' me to the sub agent.'
),
],
role='user'
), Content(
parts=[
Part(
text='For context:'
),
Part(
text="[root_agent] called tool `transfer_to_agent` with parameters: {'agent_name': 'sub_agent'}"
),
],
role='user'
), Content(
parts=[
Part(
text='For context:'
),
Part(
text="[root_agent] `transfer_to_agent` tool returned result: {'result': None}"
),
],
role='user'
)]
Because the last role is user, the model responds again on every reconnect outlined in the description of GeminiLlmConnection's method send_history():
Sends the conversation history to the gemini model.
You call this method right after setting up the model connection. The model will respond if the last content is from user, otherwise it will wait for new user input before responding.
The entire conversation history that should be sent on reconnect has been recreated from the session events:
user: {"parts": [{"text": "Hello"}], "role": "user"}
user: {"parts": [{"text": "."}], "role": "user"}
root_agent: {"parts": [{"text": "Hi there. How can I"}], "role": "model"}
root_agent: {"parts": [{"text": " help you today."}], "role": "model"}
user: {"parts": [{"text": "Take"}], "role": "user"}
user: {"parts": [{"text": " me to the sub agent."}], "role": "user"}
root_agent: {"parts": [{"function_call": {"id": "adk-6d24fd32-4bc0-498b-a820-966347562e35", "args": {"agent_name": "sub_agent"}, "name": "transfer_to_agent"}}], "role": "model"}
root_agent: {"parts": [{"function_response": {"id": "adk-6d24fd32-4bc0-498b-a820-966347562e35", "name": "transfer_to_agent", "response": {"result": null}}}], "role": "user"}
# From here down is not included in session history
sub_agent: {"parts": [{"text": "I'm the"}], "role": "model"}
sub_agent: {"parts": [{"text": " sub agent. What's on your mind."}], "role": "model"}
user: {"parts": [{"text": "Please"}], "role": "user"}
user: {"parts": [{"text": " solve the math problem."}], "role": "user"}
# Below illustrates how the agent is responding 2x for a single prompt that calls a tool
sub_agent: {"parts": [{"text": "The answer to 2 +"}], "role": "model"}
sub_agent: {"parts": [{"text": " 2 is 4."}], "role": "model"}
sub_agent: {"parts": [{"text": "The answer to 2 +"}], "role": "model"}
sub_agent: {"parts": [{"text": " 2 is 4."}], "role": "model"} # Last messages
# Waited 10 minutes for session resumption
sub_agent: {"parts": [{"text": "I'm the sub agent"}], "role": "model"}
sub_agent: {"parts": [{"text": ". How can I assist you today."}], "role": "model"}
As seen above, a large part of the session history is not being passed to the agent and it is responding out of turn on session resumption.
For "responding multiple times ", it's indeed a bug. It's being fixed here: https://github.com/google/adk-python/pull/2588
For transcription, please try it again after the above fix.
Hi @hangfei unfortunately because this is on main I am getting a separate error when trying to start my app to test:
Traceback (most recent call last):
File "/workspace/app/main.py", line 43, in
Looks like this is from the commit added 3hrs ago: #06e6fc9
I think I will have to wait for this to be on a stable branch to test. But I will try again over the next few days and let you know if it works and the issue is fully resolved.
Hi @hangfei unfortunately because this is on main I am getting a separate error when trying to start my app to test:
Traceback (most recent call last): File "/workspace/app/main.py", line 43, in app = get_fast_api_app( agents_dir=AGENT_DIR, ...<4 lines>... trace_to_cloud=TRACE_TO_CLOUD, ) File "/usr/local/lib/python3.13/site-packages/google/adk/cli/fast_api.py", line 105, in get_fast_api_app session_service = create_session_service_from_options( base_dir=agents_dir, ...<2 lines>... per_agent=True, # Multi-agent mode ) File "/usr/local/lib/python3.13/site-packages/google/adk/cli/utils/service_factory.py", line 55, in create_session_service_from_options service = registry.create_session_service(session_service_uri, **kwargs) File "/usr/local/lib/python3.13/site-packages/google/adk/cli/service_registry.py", line 132, in create_session_service return self._session_factories[scheme](uri, **kwargs) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.13/site-packages/google/adk/cli/service_registry.py", line 255, in sqlite_session_factory return SqliteSessionService(db_path=db_path, **kwargs_copy) TypeError: SqliteSessionService.init() got an unexpected keyword argument 'per_agent'
Looks like this is from the commit added 3hrs ago: #06e6fc9
I think I will have to wait for this to be on a stable branch to test. But I will try again over the next few days and let you know if it works and the issue is fully resolved.
@GWeale is working on a fix.
fixed in: https://github.com/google/adk-python/commit/73e5687b9a2014586c0a5d281c6daed2d4e1186f. you can use dev version(build wheel yourself) or wait for next release.
Cómo puedo dejar de recibir este tipo de correos por favor ayuda!
Saludos
JLPV
El mié, 26 de nov de 2025, 14:49, Hangfei Lin @.***> escribió:
hangfei left a comment (google/adk-python#3395) https://github.com/google/adk-python/issues/3395#issuecomment-3583167581
fixed in: 73e5687 https://github.com/google/adk-python/commit/73e5687b9a2014586c0a5d281c6daed2d4e1186f. you can use dev version(build wheel yourself) or wait for next release.
— Reply to this email directly, view it on GitHub https://github.com/google/adk-python/issues/3395#issuecomment-3583167581, or unsubscribe https://github.com/notifications/unsubscribe-auth/BZVU5TQMM3YUUMB56GTI4ZT36YG4PAVCNFSM6AAAAACLFG43KCVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZTKOBTGE3DONJYGE . You are receiving this because you are subscribed to this thread.Message ID: @.***>
Hello @jesusluisperezvazquez-lab
You need to unsubscribe to this thread using link provided in the mail.
Hi @hangfei I tried testing on main using pip install git+https://github.com/google/adk-python.git@main but I ran into the same issue about per agent so it looks like #73e5687 is not yet applied to main? So, I added all the changes in for that commit to my distribution manually but now I'm running into a separate issue that the sqlite database will not populate with tables. Is there some new setting which needs to be defined to do this on main? Getting the following output:
2025-11-28 18:10:30,881 - INFO - Successfully connected to database: sqlite:///./sessions.db
2025-11-28 18:10:30,889 - INFO - Found tables in database: []
2025-11-28 18:10:30,889 - INFO - Sessions table not found in table list
2025-11-28 18:10:30,889 - INFO - ADK tables not ready yet (attempt 10/10), waiting 5s...
2025-11-28 18:10:35,890 - ERROR - ADK tables never became available, metadata tables not created
Also is it possible to keep this issue open until I can test it properly? From reading #2588 I'm not sure how that will solve the second issue I was seeing of unprompted response on session resumption and erasure of history to the last agent transfer.