Make the API Modular From the Start – Avoid Monolithic Patterns (Use FastAPI's APIRouter/Modular Structure)
Problem
Currently, the API in this project appears to follow a monolithic pattern, placing all logic and endpoints in a single file or tightly-coupled structure. This approach becomes unsustainable as the codebase grows, making it harder to maintain, extend, and test.
Proposal
Adopt FastAPI's recommended modular structure for organizing API endpoints and dependencies. FastAPI provides an easy and scalable way to split your application into multiple modules using APIRouter and Python packages. This helps keep code organized by logical domain, encourages re-use, and makes it easier for contributors to navigate the codebase.
Recommended Structure Example
Reference: FastAPI - Bigger Applications
.
├── app
│ ├── __init__.py
│ ├── main.py
│ ├── dependencies.py
│ └── routers
│ │ ├── __init__.py
│ │ ├── items.py
│ │ └── users.py
│ └── internal
│ ├── __init__.py
│ └── admin.py
-
main.pycreates the FastAPI app and includes routers. -
routers/contains modules for each logical endpoint group (e.g.,items.py,users.py). -
dependencies.pyholds shared dependencies. -
internal/can hold submodules for internal/admin APIs.
Example Code Snippets
Routers (e.g., app/routers/items.py):
from fastapi import APIRouter, Depends, HTTPException
from ..dependencies import get_token_header
router = APIRouter(
prefix="/items",
tags=["items"],
dependencies=[Depends(get_token_header)],
responses={404: {"description": "Not found"}},
)
@router.get("/")
async def read_items():
...
Main app (app/main.py):
from fastapi import FastAPI, Depends
from .dependencies import get_query_token
from .routers import items, users
app = FastAPI(dependencies=[Depends(get_query_token)])
app.include_router(users.router)
app.include_router(items.router)
Rationale
- Maintainability: Logical separation makes it easier to find and update code.
- Scalability: New features can be added as new routers/modules without cluttering the main app.
- Testing: Easier to write targeted tests for specific routers or dependencies.
- Community Best Practice: Aligns with FastAPI and modern Python best practices.
References
Suggested Next Steps
- Refactor the existing API codebase into a modular structure using routers and dependencies as described above.
- Update the README or developer documentation to reflect the new structure.
- Ensure future endpoints are added following this modular pattern.
This issue proposes a structural refactor to align with FastAPI and Python best practices for maintainability and scalability.
Hi! Are you referring to samples using Langgraph / Live APi? Cause for the rest of the samples we leverage the prebuilt Fast APi offered by ADK
@eliasecchig uvx agent-starter-pack create my-agent -a live_api -d cloud_run
server.py in live_api was what I was referring to.
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import json
import logging
from collections.abc import Callable
from pathlib import Path
from typing import Any, Literal
import backoff
from fastapi import FastAPI, HTTPException, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from google.cloud import logging as google_cloud_logging
from google.genai import types
from google.genai.types import LiveServerToolCall
from pydantic import BaseModel
from websockets.exceptions import ConnectionClosedError
from .agent import MODEL_ID, genai_client, live_connect_config, tool_functions
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# Get the path to the frontend build directory
current_dir = Path(__file__).parent
frontend_build_dir = current_dir.parent / "frontend" / "build"
# Mount static files if build directory exists
if frontend_build_dir.exists():
app.mount(
"/static",
StaticFiles(directory=str(frontend_build_dir / "static")),
name="static",
)
logging_client = google_cloud_logging.Client()
logger = logging_client.logger(__name__)
logging.basicConfig(level=logging.INFO)
class GeminiSession:
"""Manages bidirectional communication between a client and the Gemini model."""
def __init__(
self, session: Any, websocket: WebSocket, tool_functions: dict[str, Callable]
) -> None:
"""Initialize the Gemini session.
Args:
session: The Gemini session
websocket: The client websocket connection
user_id: Unique identifier for this client
tool_functions: Dictionary of available tool functions
"""
self.session = session
self.websocket = websocket
self.run_id = "n/a"
self.user_id = "n/a"
self.tool_functions = tool_functions
self._tool_tasks: list[asyncio.Task] = []
async def receive_from_client(self) -> None:
"""Listen for and process messages from the client.
Continuously receives messages and forwards audio data to Gemini.
Handles connection errors gracefully.
"""
while True:
try:
data = await self.websocket.receive_json()
if isinstance(data, dict) and (
"realtimeInput" in data or "clientContent" in data
):
await self.session._ws.send(json.dumps(data))
elif "setup" in data:
self.run_id = data["setup"]["run_id"]
self.user_id = data["setup"]["user_id"]
logger.log_struct(
{**data["setup"], "type": "setup"}, severity="INFO"
)
else:
logging.warning(f"Received unexpected input from client: {data}")
except ConnectionClosedError as e:
logging.warning(f"Client {self.user_id} closed connection: {e}")
break
except Exception as e:
logging.error(f"Error receiving from client {self.user_id}: {e!s}")
break
def _get_func(self, action_label: str | None) -> Callable | None:
"""Get the tool function for a given action label."""
if action_label is None or action_label == "":
return None
return self.tool_functions.get(action_label)
async def _handle_tool_call(
self, session: Any, tool_call: LiveServerToolCall
) -> None:
"""Process tool calls from Gemini and send back responses."""
if tool_call.function_calls is None:
logging.debug("No function calls in tool_call")
return
for fc in tool_call.function_calls:
logging.debug(f"Calling tool function: {fc.name} with args: {fc.args}")
func = self._get_func(fc.name)
if func is None:
logging.error(f"Function {fc.name} not found")
continue
args = fc.args if fc.args is not None else {}
# Handle both async and sync functions appropriately
if asyncio.iscoroutinefunction(func):
# Function is already async
response = await func(**args)
else:
# Run sync function in a thread pool to avoid blocking
response = await asyncio.to_thread(func, **args)
tool_response = types.LiveClientToolResponse(
function_responses=[
types.FunctionResponse(name=fc.name, id=fc.id, response=response)
]
)
logging.debug(f"Tool response: {tool_response}")
await session.send(input=tool_response)
async def receive_from_gemini(self) -> None:
"""Listen for and process messages from Gemini without blocking."""
while result := await self.session._ws.recv(decode=False):
await self.websocket.send_bytes(result)
raw_message = json.loads(result)
if "toolCall" in raw_message:
message = types.LiveServerMessage.model_validate(raw_message)
tool_call = LiveServerToolCall.model_validate(message.tool_call)
# Create a separate task to handle the tool call without blocking
task = asyncio.create_task(
self._handle_tool_call(self.session, tool_call)
)
self._tool_tasks.append(task)
def get_connect_and_run_callable(websocket: WebSocket) -> Callable:
"""Create a callable that handles Gemini connection with retry logic.
Args:
websocket: The client websocket connection
Returns:
Callable: An async function that establishes and manages the Gemini connection
"""
async def on_backoff(details: backoff._typing.Details) -> None:
await websocket.send_json(
{
"status": f"Model connection error, retrying in {details['wait']} seconds..."
}
)
@backoff.on_exception(
backoff.expo, ConnectionClosedError, max_tries=10, on_backoff=on_backoff
)
async def connect_and_run() -> None:
async with genai_client.aio.live.connect(
model=MODEL_ID, config=live_connect_config
) as session:
await websocket.send_json({"status": "Backend is ready for conversation"})
gemini_session = GeminiSession(
session=session, websocket=websocket, tool_functions=tool_functions
)
logging.info("Starting bidirectional communication")
await asyncio.gather(
gemini_session.receive_from_client(),
gemini_session.receive_from_gemini(),
)
return connect_and_run
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket) -> None:
"""Handle new websocket connections."""
await websocket.accept()
connect_and_run = get_connect_and_run_callable(websocket)
await connect_and_run()
class Feedback(BaseModel):
"""Represents feedback for a conversation."""
score: int | float
text: str | None = ""
run_id: str
user_id: str | None
log_type: Literal["feedback"] = "feedback"
@app.post("/feedback")
def collect_feedback(feedback: Feedback) -> dict[str, str]:
"""Collect and log feedback.
Args:
feedback: The feedback data to log
Returns:
Success message
"""
logger.log_struct(feedback.model_dump(), severity="INFO")
return {"status": "success"}
@app.get("/")
async def serve_frontend_root() -> FileResponse:
"""Serve the frontend index.html at the root path."""
index_file = frontend_build_dir / "index.html"
if index_file.exists():
return FileResponse(str(index_file))
raise HTTPException(
status_code=404,
detail="Frontend not built. Run 'npm run build' in the frontend directory.",
)
@app.get("/{full_path:path}")
async def serve_frontend_spa(full_path: str) -> FileResponse:
"""Catch-all route to serve the frontend for SPA routing.
This ensures that client-side routes are handled by the React app.
Excludes API routes (ws, feedback) and static assets.
"""
# Don't intercept API routes
if full_path.startswith(("ws", "feedback", "static", "api")):
raise HTTPException(status_code=404, detail="Not found")
# Serve index.html for all other routes (SPA routing)
index_file = frontend_build_dir / "index.html"
if index_file.exists():
return FileResponse(str(index_file))
raise HTTPException(
status_code=404,
detail="Frontend not built. Run 'npm run build' in the frontend directory.",
)
# Main execution
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)