I keep getting this error `Response payload is not completed: <TransferEncodingError: 400, message='Not enough data to satisfy transfer length header.'>")>`
Yeah, I keep getting that error, and it takes a few regenerations. But it doesn't seem to count as an error on the API end. I'm using Google Gemini API with the Gemini Pipeline.
Here's my logs. The weird thing is that the generation will begin to show on the WebUI but the error will be thrown in the logs. Then when I refresh, the generated text that managed to load disappears, and the error message is shown.
2025-09-11 14:01:08.125 | INFO | uvicorn.protocols.http.httptools_impl:send:476 - 100.78.217.82:0 - "GET /api/v1/chats/?page=1 HTTP/1.1" 200
2025-09-11 14:01:08.329 | INFO | uvicorn.protocols.http.httptools_impl:send:476 - 100.78.217.82:0 - "GET /api/v1/folders/ HTTP/1.1" 200
2025-09-11 14:01:09.341 | INFO | uvicorn.protocols.http.httptools_impl:send:476 - 100.78.217.82:0 - "POST /api/chat/completions HTTP/1.1" 200
2025-09-11 14:01:09.519 | INFO | open_webui.routers.openai:get_all_models:406 - get_all_models()
2025-09-11 14:01:09.894 | INFO | uvicorn.protocols.http.httptools_impl:send:476 - 100.78.217.82:0 - "GET /api/v1/chats/?page=1 HTTP/1.1" 200
2025-09-11 14:01:10.538 | INFO | uvicorn.protocols.http.httptools_impl:send:476 - 100.78.217.82:0 - "GET /api/v1/folders/ HTTP/1.1" 200
2025-09-11 14:01:16.293 | ERROR | open_webui.tasks:cleanup_task:88 - Task exception was never retrieved
future: <Task finished name='Task-2350' coro=<chat_completion.<locals>.process_chat() done, defined at /app/backend/open_webui/main.py:1495> exception=HTTPException(status_code=400, detail="Response payload is not completed: <TransferEncodingError: 400, message='Not enough data to satisfy transfer length header.'>")>
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/aiohttp/client_proto.py", line 144, in connection_lost
uncompleted = self._parser.feed_eof()
│ └ None
└ <aiohttp.client_proto.ResponseHandler object at 0x7f2ad62d60>
File "aiohttp/_http_parser.pyx", line 508, in aiohttp._http_parser.HttpParser.feed_eof
raise TransferEncodingError(
└ <class 'aiohttp.http_exceptions.TransferEncodingError'>
aiohttp.http_exceptions.TransferEncodingError: 400, message:
Not enough data to satisfy transfer length header.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/app/backend/open_webui/main.py", line 1514, in process_chat
return await process_chat_response(
└ <function process_chat_response at 0x7f7c89f240>
File "/app/backend/open_webui/utils/middleware.py", line 2636, in process_chat_response
return await response_handler(response, events)
│ │ └ []
│ └ <starlette.responses.StreamingResponse object at 0x7f2ad5aa90>
└ <function process_chat_response.<locals>.response_handler at 0x7f2adc6200>
File "/app/backend/open_webui/utils/middleware.py", line 2208, in response_handler
await stream_body_handler(response, form_data)
│ │ └ {'stream': True, 'model': 'google_genai.gemini-2.5-pro-preview-05-06', 'messages': [{'role': 'system', 'content': '**I. Core ...
│ └ <starlette.responses.StreamingResponse object at 0x7f2ad5aa90>
└ <function process_chat_response.<locals>.response_handler.<locals>.stream_body_handler at 0x7f2adc7920>
File "/app/backend/open_webui/utils/middleware.py", line 1902, in stream_body_handler
async for line in response.body_iterator:
│ │ └ <StreamReader e=ClientPayloadError("Response payload is not completed: <TransferEncodingError: 400, message='Not enough data ...
│ └ <starlette.responses.StreamingResponse object at 0x7f2ad5aa90>
└ '\n'
File "/usr/local/lib/python3.11/site-packages/aiohttp/streams.py", line 52, in __anext__
rv = await self.read_func()
│ └ <member 'read_func' of 'AsyncStreamIterator' objects>
└ <aiohttp.streams.AsyncStreamIterator object at 0x7f5a5130a0>
File "/usr/local/lib/python3.11/site-packages/aiohttp/streams.py", line 352, in readline
return await self.readuntil()
│ └ <function StreamReader.readuntil at 0x7fae8cc040>
└ <StreamReader e=ClientPayloadError("Response payload is not completed: <TransferEncodingError: 400, message='Not enough data ...
File "/usr/local/lib/python3.11/site-packages/aiohttp/streams.py", line 386, in readuntil
await self._wait("readuntil")
│ └ <function StreamReader._wait at 0x7fae8b3ec0>
└ <StreamReader e=ClientPayloadError("Response payload is not completed: <TransferEncodingError: 400, message='Not enough data ...
File "/usr/local/lib/python3.11/site-packages/aiohttp/streams.py", line 347, in _wait
await waiter
└ <Future finished exception=ClientPayloadError("Response payload is not completed: <TransferEncodingError: 400, message='Not e...
aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed: <TransferEncodingError: 400, message='Not enough data to satisfy transfer length header.'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
> File "/app/backend/open_webui/main.py", line 1541, in process_chat
raise HTTPException(
└ <class 'fastapi.exceptions.HTTPException'>
fastapi.exceptions.HTTPException: 400: Response payload is not completed: <TransferEncodingError: 400, message='Not enough data to satisfy transfer length header.'>
2025-09-11 14:01:53.119 | INFO | uvicorn.protocols.http.httptools_impl:send:476 - 100.78.217.82:0 - "GET /_app/version.json HTTP/1.1" 304
@ndrew222 Can you share your pipeline code to analyze the cause of the error?
Oh yeah. Sorry.
"""
title: Google Gemini Pipeline
author: owndev, olivier-lacroix
author_url: https://github.com/owndev/
project_url: https://github.com/owndev/Open-WebUI-Functions
funding_url: https://github.com/sponsors/owndev
version: 1.3.3
license: Apache License 2.0
description: A manifold pipeline for interacting with Google Gemini models, including dynamic model specification, streaming responses, and flexible error handling.
features:
- Asynchronous API calls for better performance
- Model caching to reduce API calls
- Dynamic model specification with automatic prefix stripping
- Streaming response handling with safety checks
- Support for multimodal input (text and images)
- Flexible error handling and logging
- Integration with Google Generative AI or Vertex AI API for content generation
- Support for various generation parameters (temperature, max tokens, etc.)
- Customizable safety settings based on environment variables
- Encrypted storage of sensitive API keys
- Grounding with Google search
- Native tool calling support
"""
import os
import inspect
from functools import update_wrapper
import re
import time
import asyncio
import base64
import hashlib
import logging
from google import genai
from google.genai import types
from google.genai.errors import ClientError, ServerError, APIError
from typing import List, Union, Optional, Dict, Any, Tuple, AsyncIterator, Callable
from pydantic_core import core_schema
from pydantic import BaseModel, Field, GetCoreSchemaHandler
from cryptography.fernet import Fernet, InvalidToken
from open_webui.env import SRC_LOG_LEVELS
# Simplified encryption implementation with automatic handling
class EncryptedStr(str):
"""A string type that automatically handles encryption/decryption"""
@classmethod
def _get_encryption_key(cls) -> Optional[bytes]:
"""
Generate encryption key from WEBUI_SECRET_KEY if available
Returns None if no key is configured
"""
secret = os.getenv("WEBUI_SECRET_KEY")
if not secret:
return None
hashed_key = hashlib.sha256(secret.encode()).digest()
return base64.urlsafe_b64encode(hashed_key)
@classmethod
def encrypt(cls, value: str) -> str:
"""
Encrypt a string value if a key is available
Returns the original value if no key is available
"""
if not value or value.startswith("encrypted:"):
return value
key = cls._get_encryption_key()
if not key: # No encryption if no key
return value
f = Fernet(key)
encrypted = f.encrypt(value.encode())
return f"encrypted:{encrypted.decode()}"
@classmethod
def decrypt(cls, value: str) -> str:
"""
Decrypt an encrypted string value if a key is available
Returns the original value if no key is available or decryption fails
"""
if not value or not value.startswith("encrypted:"):
return value
key = cls._get_encryption_key()
if not key: # No decryption if no key
return value[len("encrypted:") :] # Return without prefix
try:
encrypted_part = value[len("encrypted:") :]
f = Fernet(key)
decrypted = f.decrypt(encrypted_part.encode())
return decrypted.decode()
except (InvalidToken, Exception):
return value
# Pydantic integration
@classmethod
def __get_pydantic_core_schema__(
cls, _source_type: Any, _handler: GetCoreSchemaHandler
) -> core_schema.CoreSchema:
return core_schema.union_schema(
[
core_schema.is_instance_schema(cls),
core_schema.chain_schema(
[
core_schema.str_schema(),
core_schema.no_info_plain_validator_function(
lambda value: cls(cls.encrypt(value) if value else value)
),
]
),
],
serialization=core_schema.plain_serializer_function_ser_schema(
lambda instance: str(instance)
),
)
def get_decrypted(self) -> str:
"""Get the decrypted value"""
return self.decrypt(self)
class Pipe:
"""
Pipeline for interacting with Google Gemini models.
"""
# Configuration valves for the pipeline
class Valves(BaseModel):
GOOGLE_API_KEY: EncryptedStr = Field(
default=os.getenv("GOOGLE_API_KEY", ""),
description="API key for Google Generative AI (used if USE_VERTEX_AI is false).",
)
USE_VERTEX_AI: bool = Field(
default=os.getenv("GOOGLE_GENAI_USE_VERTEXAI", "false").lower() == "true",
description="Whether to use Google Cloud Vertex AI instead of the Google Generative AI API.",
)
VERTEX_PROJECT: str | None = Field(
default=os.getenv("GOOGLE_CLOUD_PROJECT"),
description="The Google Cloud project ID to use with Vertex AI.",
)
VERTEX_LOCATION: str = Field(
default=os.getenv("GOOGLE_CLOUD_LOCATION", "global"),
description="The Google Cloud region to use with Vertex AI.",
)
USE_PERMISSIVE_SAFETY: bool = Field(
default=os.getenv("USE_PERMISSIVE_SAFETY", "false").lower() == "true",
description="Use permissive safety settings for content generation.",
)
MODEL_CACHE_TTL: int = Field(
default=int(os.getenv("GOOGLE_MODEL_CACHE_TTL", "600")),
description="Time in seconds to cache the model list before refreshing",
)
RETRY_COUNT: int = Field(
default=int(os.getenv("GOOGLE_RETRY_COUNT", "2")),
description="Number of times to retry API calls on temporary failures",
)
def __init__(self):
"""Initializes the Pipe instance and configures the genai library."""
self.valves = self.Valves()
self.name: str = "Google Gemini: "
# Setup logging
self.log = logging.getLogger("google_ai.pipe")
self.log.setLevel(SRC_LOG_LEVELS.get("OPENAI", logging.INFO))
# Model cache
self._model_cache: Optional[List[Dict[str, str]]] = None
self._model_cache_time: float = 0
def _get_client(self) -> genai.Client:
"""
Validates API credentials and returns a genai.Client instance.
"""
self._validate_api_key()
if self.valves.USE_VERTEX_AI:
self.log.debug(
f"Initializing Vertex AI client (Project: {self.valves.VERTEX_PROJECT}, Location: {self.valves.VERTEX_LOCATION})"
)
return genai.Client(
vertexai=True,
project=self.valves.VERTEX_PROJECT,
location=self.valves.VERTEX_LOCATION,
)
else:
self.log.debug("Initializing Google Generative AI client with API Key")
return genai.Client(api_key=self.valves.GOOGLE_API_KEY.get_decrypted())
def _validate_api_key(self) -> None:
"""
Validates that the necessary Google API credentials are set.
Raises:
ValueError: If the required credentials are not set.
"""
if self.valves.USE_VERTEX_AI:
if not self.valves.VERTEX_PROJECT:
self.log.error("USE_VERTEX_AI is true, but VERTEX_PROJECT is not set.")
raise ValueError(
"VERTEX_PROJECT is not set. Please provide the Google Cloud project ID."
)
# For Vertex AI, location has a default, so project is the main thing to check.
# Actual authentication will be handled by ADC or environment.
self.log.debug(
"Using Vertex AI. Ensure ADC or service account is configured."
)
else:
if not self.valves.GOOGLE_API_KEY:
self.log.error("GOOGLE_API_KEY is not set (and not using Vertex AI).")
raise ValueError(
"GOOGLE_API_KEY is not set. Please provide the API key in the environment variables or valves."
)
self.log.debug("Using Google Generative AI API with API Key.")
def strip_prefix(self, model_name: str) -> str:
"""
Extract the model identifier using regex, handling various naming conventions.
e.g., "google_gemini_pipeline.gemini-2.5-flash-preview-04-17" -> "gemini-2.5-flash-preview-04-17"
e.g., "models/gemini-1.5-flash-001" -> "gemini-1.5-flash-001"
e.g., "publishers/google/models/gemini-1.5-pro" -> "gemini-1.5-pro"
"""
# Use regex to remove everything up to and including the last '/' or the first '.'
stripped = re.sub(r"^(?:.*/|[^.]*\.)", "", model_name)
return stripped
def get_google_models(self, force_refresh: bool = False) -> List[Dict[str, str]]:
"""
Retrieve available Google models suitable for content generation.
Uses caching to reduce API calls.
Args:
force_refresh: Whether to force refreshing the model cache
Returns:
List of dictionaries containing model id and name.
"""
# Check cache first
current_time = time.time()
if (
not force_refresh
and self._model_cache is not None
and (current_time - self._model_cache_time) < self.valves.MODEL_CACHE_TTL
):
self.log.debug("Using cached model list")
return self._model_cache
try:
client = self._get_client()
self.log.debug("Fetching models from Google API")
models = client.models.list()
available_models = []
for model in models:
actions = model.supported_actions
if actions is None or "generateContent" in actions:
available_models.append(
{
"id": self.strip_prefix(model.name),
"name": model.display_name or self.strip_prefix(model.name),
}
)
model_map = {model["id"]: model for model in available_models}
# Filter map to only include models starting with 'gemini-'
filtered_models = {
k: v for k, v in model_map.items() if k.startswith("gemini-")
}
# Update cache
self._model_cache = list(filtered_models.values())
self._model_cache_time = current_time
self.log.debug(f"Found {len(self._model_cache)} Gemini models")
return self._model_cache
except Exception as e:
self.log.exception(f"Could not fetch models from Google: {str(e)}")
# Return a specific error entry for the UI
return [{"id": "error", "name": f"Could not fetch models: {str(e)}"}]
def pipes(self) -> List[Dict[str, str]]:
"""
Returns a list of available Google Gemini models for the UI.
Returns:
List of dictionaries containing model id and name.
"""
try:
self.name = "Google Gemini: "
return self.get_google_models()
except ValueError as e:
# Handle the case where API key is missing during pipe listing
self.log.error(f"Error during pipes listing (validation): {e}")
return [{"id": "error", "name": str(e)}]
except Exception as e:
# Handle other potential errors during model fetching
self.log.exception(
f"An unexpected error occurred during pipes listing: {str(e)}"
)
return [{"id": "error", "name": f"An unexpected error occurred: {str(e)}"}]
def _prepare_model_id(self, model_id: str) -> str:
"""
Prepare and validate the model ID for use with the API.
Args:
model_id: The original model ID from the user
Returns:
Properly formatted model ID
Raises:
ValueError: If the model ID is invalid or unsupported
"""
original_model_id = model_id
model_id = self.strip_prefix(model_id)
# If the model ID doesn't look like a Gemini model, try to find it by name
if not model_id.startswith("gemini-"):
models_list = self.get_google_models()
found_model = next(
(m["id"] for m in models_list if m["name"] == original_model_id), None
)
if found_model and found_model.startswith("gemini-"):
model_id = found_model
self.log.debug(
f"Mapped model name '{original_model_id}' to model ID '{model_id}'"
)
else:
# If we still don't have a valid ID, raise an error
if not model_id.startswith("gemini-"):
self.log.error(
f"Invalid or unsupported model ID: '{original_model_id}'"
)
raise ValueError(
f"Invalid or unsupported Google model ID or name: '{original_model_id}'"
)
return model_id
def _prepare_content(
self, messages: List[Dict[str, Any]]
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
"""
Prepare messages content for the API and extract system message if present.
Args:
messages: List of message objects from the request
Returns:
Tuple of (prepared content list, system message string or None)
"""
# Extract system message
system_message = next(
(msg["content"] for msg in messages if msg.get("role") == "system"),
None,
)
# Prepare contents for the API
contents = []
for message in messages:
role = message.get("role")
if role == "system":
continue # Skip system messages, handled separately
content = message.get("content", "")
parts = []
# Handle different content types
if isinstance(content, list): # Multimodal content
parts.extend(self._process_multimodal_content(content))
elif isinstance(content, str): # Plain text content
parts.append({"text": content})
else:
self.log.warning(f"Unsupported message content type: {type(content)}")
continue # Skip unsupported content
# Map roles: 'assistant' -> 'model', 'user' -> 'user'
api_role = "model" if role == "assistant" else "user"
if parts: # Only add if there are parts
contents.append({"role": api_role, "parts": parts})
return contents, system_message
def _process_multimodal_content(
self, content_list: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Process multimodal content (text and images).
Args:
content_list: List of content items
Returns:
List of processed parts for the Gemini API
"""
parts = []
for item in content_list:
if item.get("type") == "text":
parts.append({"text": item.get("text", "")})
elif item.get("type") == "image_url":
image_url = item.get("image_url", {}).get("url", "")
if image_url.startswith("data:image"):
# Handle base64 encoded image data
try:
header, encoded = image_url.split(",", 1)
mime_type = header.split(":")[1].split(";")[0]
# Basic validation for image types
if mime_type not in [
"image/jpeg",
"image/png",
"image/webp",
"image/heic",
"image/heif",
]:
self.log.warning(
f"Unsupported image mime type: {mime_type}"
)
parts.append(
{"text": f"[Image type {mime_type} not supported]"}
)
continue
parts.append(
{
"inline_data": {
"mime_type": mime_type,
"data": encoded,
}
}
)
except Exception as img_ex:
self.log.exception(f"Could not parse image data URL: {img_ex}")
parts.append({"text": "[Image data could not be processed]"})
else:
# Gemini API doesn't directly support image URLs
self.log.warning(f"Direct image URLs not supported: {image_url}")
parts.append({"text": f"[Image URL not processed: {image_url}]"})
return parts
@staticmethod
def _create_tool(tool_def):
"""OpenwebUI tool is a functools.partial coroutine, which genai does not support directly.
See https://github.com/googleapis/python-genai/issues/907
This function wraps the tool into a callable that can be used with genai.
In particular, it sets the signature of the function properly,
removing any frozen keyword arguments (extra_params).
"""
bound_callable = tool_def["callable"]
# Create a wrapper for bound_callable, which is always async
async def wrapper(*args, **kwargs):
return await bound_callable(*args, **kwargs)
# Remove 'frozen' keyword arguments (extra_params) from the signature
original_sig = inspect.signature(bound_callable)
frozen_kwargs = {
"__event_emitter__",
"__event_call__",
"__user__",
"__metadata__",
"__request__",
"__model__",
}
new_parameters = []
for name, parameter in original_sig.parameters.items():
# Exclude keyword arguments that are frozen
if name in frozen_kwargs and parameter.kind in (
inspect.Parameter.POSITIONAL_OR_KEYWORD,
inspect.Parameter.KEYWORD_ONLY,
):
continue
# Keep remaining parameters
new_parameters.append(parameter)
new_sig = inspect.Signature(
parameters=new_parameters, return_annotation=original_sig.return_annotation
)
# Ensure name, docstring and signature are properly set
update_wrapper(wrapper, bound_callable)
wrapper.__signature__ = new_sig
return wrapper
def _configure_generation(
self,
body: Dict[str, Any],
system_instruction: Optional[str],
__metadata__: Dict[str, Any],
__tools__: dict[str, Any] | None = None,
) -> types.GenerateContentConfig:
"""
Configure generation parameters and safety settings.
Args:
body: The request body containing generation parameters
system_instruction: Optional system instruction string
Returns:
types.GenerateContentConfig
"""
gen_config_params = {
"temperature": body.get("temperature"),
"top_p": body.get("top_p"),
"top_k": body.get("top_k"),
"max_output_tokens": body.get("max_tokens"),
"stop_sequences": body.get("stop") or None,
"system_instruction": system_instruction,
}
# Configure safety settings
if self.valves.USE_PERMISSIVE_SAFETY:
safety_settings = [
types.SafetySetting(
category="HARM_CATEGORY_HARASSMENT", threshold="BLOCK_NONE"
),
types.SafetySetting(
category="HARM_CATEGORY_HATE_SPEECH", threshold="BLOCK_NONE"
),
types.SafetySetting(
category="HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold="BLOCK_NONE"
),
types.SafetySetting(
category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="BLOCK_NONE"
),
]
gen_config_params |= {"safety_settings": safety_settings}
features = __metadata__.get("features", {})
if features.get("google_search_tool", False):
self.log.debug("Enabling Google search grounding")
gen_config_params.setdefault("tools", []).append(
types.Tool(google_search=types.GoogleSearch())
)
if __tools__ is not None and __metadata__.get("function_calling") == "native":
for name, tool_def in __tools__.items():
tool = self._create_tool(tool_def)
self.log.debug(
f"Adding tool '{name}' with signature {tool.__signature__}"
)
gen_config_params.setdefault("tools", []).append(tool)
# Filter out None values for generation config
filtered_params = {k: v for k, v in gen_config_params.items() if v is not None}
return types.GenerateContentConfig(**filtered_params)
@staticmethod
def _format_grounding_chunks_as_sources(
grounding_chunks: list[types.GroundingChunk],
):
formatted_sources = []
for chunk in grounding_chunks:
context = chunk.web or chunk.retrieved_context
if not context:
continue
uri = context.uri
title = context.title or "Source"
formatted_sources.append(
{
"source": {
"name": title,
"type": "web_search_results",
"url": uri,
},
"document": ["Click the link to view the content."],
"metadata": [{"source": title}],
}
)
return formatted_sources
async def _process_grounding_metadata(
self,
grounding_metadata_list: List[types.GroundingMetadata],
text: str,
__event_emitter__: Callable,
):
"""Process and emit grounding metadata events."""
grounding_chunks = []
web_search_queries = []
grounding_supports = []
for metadata in grounding_metadata_list:
if metadata.grounding_chunks:
grounding_chunks.extend(metadata.grounding_chunks)
if metadata.web_search_queries:
web_search_queries.extend(metadata.web_search_queries)
if metadata.grounding_supports:
grounding_supports.extend(metadata.grounding_supports)
# Add sources to the response
if grounding_chunks:
sources = self._format_grounding_chunks_as_sources(grounding_chunks)
await __event_emitter__(
{"type": "chat:completion", "data": {"sources": sources}}
)
# Add status specifying google queries used for grounding
if web_search_queries:
await __event_emitter__(
{
"type": "status",
"data": {
"action": "web_search",
"description": "This response was grounded with Google Search",
"urls": [
f"https://www.google.com/search?q={query}"
for query in web_search_queries
],
},
}
)
# Add citations in the text body
if grounding_supports:
# Citation indexes are in bytes
ENCODING = "utf-8"
text_bytes = text.encode(ENCODING)
last_byte_index = 0
cited_chunks = []
for support in grounding_supports:
cited_chunks.append(
text_bytes[last_byte_index : support.segment.end_index].decode(
ENCODING
)
)
# Generate and append citations (e.g., "[1][2]")
footnotes = "".join(
[f"[{i + 1}]" for i in support.grounding_chunk_indices]
)
cited_chunks.append(f" {footnotes}")
# Update index for the next segment
last_byte_index = support.segment.end_index
# Append any remaining text after the last citation
if last_byte_index < len(text_bytes):
cited_chunks.append(text_bytes[last_byte_index:].decode(ENCODING))
await __event_emitter__(
{
"type": "replace",
"data": {"content": "".join(cited_chunks)},
}
)
async def _handle_streaming_response(
self,
response_iterator: Any,
__event_emitter__: Callable,
) -> AsyncIterator[str]:
"""
Handle streaming response from Gemini API.
Args:
response_iterator: Iterator from generate_content
Returns:
Generator yielding text chunks
"""
grounding_metadata_list = []
text_chunks = []
try:
async for chunk in response_iterator:
# Check for safety feedback or empty chunks
if not chunk.candidates:
# Check prompt feedback
if (
response_iterator.prompt_feedback
and response_iterator.prompt_feedback.block_reason
):
yield f"[Blocked due to Prompt Safety: {response_iterator.prompt_feedback.block_reason.name}]"
else:
yield "[Blocked by safety settings]"
return # Stop generation
if chunk.candidates[0].grounding_metadata:
grounding_metadata_list.append(
chunk.candidates[0].grounding_metadata
)
if chunk.text:
text_chunks.append(chunk.text)
await __event_emitter__(
{
"type": "chat:message:delta",
"data": {
"content": chunk.text,
},
}
)
# After processing all chunks, handle grounding data
if grounding_metadata_list and __event_emitter__:
await self._process_grounding_metadata(
grounding_metadata_list, "".join(text_chunks), __event_emitter__
)
except Exception as e:
self.log.exception(f"Error during streaming: {e}")
yield f"Error during streaming: {e}"
def _handle_standard_response(self, response: Any) -> str:
"""
Handle non-streaming response from Gemini API.
Args:
response: Response from generate_content
Returns:
Generated text or error message
"""
# Check for prompt safety blocks
if response.prompt_feedback and response.prompt_feedback.block_reason:
return f"[Blocked due to Prompt Safety: {response.prompt_feedback.block_reason.name}]"
# Check for missing candidates
if not response.candidates:
return "[Blocked by safety settings or no candidates generated]"
# Check candidate finish reason
candidate = response.candidates[0]
if candidate.finish_reason == types.FinishReason.SAFETY:
# Try to get specific safety rating info
blocking_rating = next(
(r for r in candidate.safety_ratings if r.blocked), None
)
reason = f" ({blocking_rating.category.name})" if blocking_rating else ""
return f"[Blocked by safety settings{reason}]"
# Process content parts
if candidate.content and candidate.content.parts:
# Combine text from all parts
return "".join(
part.text for part in candidate.content.parts if hasattr(part, "text")
)
else:
return "[No content generated or unexpected response structure]"
async def _retry_with_backoff(self, func, *args, **kwargs) -> Any:
"""
Retry a function with exponential backoff.
Args:
func: Async function to retry
*args, **kwargs: Arguments to pass to the function
Returns:
Result from the function
Raises:
The last exception encountered after all retries
"""
max_retries = self.valves.RETRY_COUNT
retry_count = 0
last_exception = None
while retry_count <= max_retries:
try:
return await func(*args, **kwargs)
except ServerError as e:
# These errors might be temporary, so retry
retry_count += 1
last_exception = e
if retry_count <= max_retries:
# Calculate backoff time (exponential with jitter)
wait_time = min(2**retry_count + (0.1 * retry_count), 10)
self.log.warning(
f"Temporary error from Google API: {e}. Retrying in {wait_time:.1f}s ({retry_count}/{max_retries})"
)
await asyncio.sleep(wait_time)
else:
raise
except Exception:
# Don't retry other exceptions
raise
# If we get here, we've exhausted retries
assert last_exception is not None
raise last_exception
async def pipe(
self,
body: Dict[str, Any],
__metadata__: dict[str, Any],
__event_emitter__: Callable,
__tools__: dict[str, Any] | None,
) -> Union[str, AsyncIterator[str]]:
"""
Main method for sending requests to the Google Gemini endpoint.
Args:
body: The request body containing messages and other parameters.
Returns:
Response from Google Gemini API, which could be a string or an iterator for streaming.
"""
# Setup logging for this request
request_id = id(body)
self.log.debug(f"Processing request {request_id}")
try:
# Parse and validate model ID
model_id = body.get("model", "")
try:
model_id = self._prepare_model_id(model_id)
self.log.debug(f"Using model: {model_id}")
except ValueError as ve:
return f"Model Error: {ve}"
# Get stream flag
stream = body.get("stream", False)
messages = body.get("messages", [])
# Prepare content and extract system message
contents, system_instruction = self._prepare_content(messages)
if not contents:
return "Error: No valid message content found"
# Configure generation parameters and safety settings
generation_config = self._configure_generation(
body, system_instruction, __metadata__, __tools__
)
# Make the API call
client = self._get_client()
if stream:
try:
async def get_streaming_response():
return await client.aio.models.generate_content_stream(
model=model_id,
contents=contents,
config=generation_config,
)
response_iterator = await self._retry_with_backoff(
get_streaming_response
)
self.log.debug(f"Request {request_id}: Got streaming response")
return self._handle_streaming_response(
response_iterator, __event_emitter__
)
except Exception as e:
self.log.exception(f"Error in streaming request {request_id}: {e}")
return f"Error during streaming: {e}"
else:
try:
async def get_response():
return await client.aio.models.generate_content(
model=model_id,
contents=contents,
config=generation_config,
)
response = await self._retry_with_backoff(get_response)
self.log.debug(f"Request {request_id}: Got non-streaming response")
return self._handle_standard_response(response)
except Exception as e:
self.log.exception(
f"Error in non-streaming request {request_id}: {e}"
)
return f"Error generating content: {e}"
except ClientError as ce:
error_msg = f"Client error raised by the GenAI API: {ce}."
self.log.error(f"Client error: {ce}")
return error_msg
except ServerError as se:
error_msg = f"Server error raised by the GenAI API: {se}"
self.log.error(f"Server error raised by the GenAI API.: {se}")
return error_msg
except APIError as apie:
error_msg = f"Google API Error: {apie}"
self.log.error(error_msg)
return error_msg
except ValueError as ve:
error_msg = f"Configuration error: {ve}"
self.log.error(f"Value error: {ve}")
return error_msg
except Exception as e:
# Log the full error with traceback
import traceback
error_trace = traceback.format_exc()
self.log.exception(f"Unexpected error: {e}\n{error_trace}")
# Return a user-friendly error message
return f"An error occurred while processing your request: {e}"
@ndrew222 What you uploaded looks like the error log of open-webui. Did any errors occur in pipelines? If so, can you show me the error log?
I'm not sure how to get the pipeline error log. Is it from docker log
@ndrew222 Sorry for replying late. When you run in a Docker environment, don't you usually run both the open-webui and pipelines containers? What I meant was that showing both containers' docker logs would be helpful for error analysis, so please share!
Ah, I see. I'm currently on vacation, so haven't touched my Open WebUI instance in a while. I'll get back to you when I encounter the error again.
Managed to get the error again.
INFO: Started server process [7]
INFO: Waiting for application startup.
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
ollama 0.3.3 requires httpx<0.28.0,>=0.27.0, but you have httpx 0.28.1 which is incompatible.
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv
[notice] A new release of pip is available: 24.0 -> 25.2
[notice] To update, run: pip install --upgrade pip
INFO:root:Updated valves for module: google_manifold_pipeline
INFO:root:Loaded module: google_manifold_pipeline
INFO:httpx:HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/models "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/models?pageToken=Ch5tb2RlbHMvaW1hZ2VuLTQuMC1nZW5lcmF0ZS0wMDE%3D "HTTP/1.1 200 OK"
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:9099 (Press CTRL+C to quit)
INFO:root:stream:true:<generator object Pipeline.stream_response at 0x7f77d0cd50>
INFO:google_genai.models:AFC is enabled with max remote calls: 10.
ERROR: Exception in ASGI application
+ Exception Group Traceback (most recent call last):
| File "/usr/local/lib/python3.11/site-packages/starlette/_utils.py", line 87, in collapse_excgroups
| yield
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 190, in __call__
| async with anyio.create_task_group() as task_group:
| File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Exception Group Traceback (most recent call last):
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 192, in __call__
| await response(scope, wrapped_receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 258, in __call__
| async with anyio.create_task_group() as task_group:
| File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Exception Group Traceback (most recent call last):
| File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 261, in wrap
| await func()
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 217, in stream_response
| return await super().stream_response(send)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in stream_response
| async for chunk in self.body_iterator:
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 181, in body_stream
| raise app_exc
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 151, in coro
| await self.app(scope, receive_or_disconnect, send_no_error)
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 85, in __call__
| await self.app(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 65, in __call__
| await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
| raise exc
| File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
| await app(scope, receive, sender)
| File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 756, in __call__
| await self.middleware_stack(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 776, in app
| await route.handle(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 297, in handle
| await self.app(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 77, in app
| await wrap_app_handling_exceptions(app, request)(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
| raise exc
| File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
| await app(scope, receive, sender)
| File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 75, in app
| await response(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 258, in __call__
| async with anyio.create_task_group() as task_group:
| File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 435, in run_asgi
| result = await app( # type: ignore[func-returns-value]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
| return await self.app(scope, receive, send)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
| await super().__call__(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
| await self.middleware_stack(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
| raise exc
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
| await self.app(scope, receive, _send)
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 189, in __call__
| with collapse_excgroups():
| File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
| self.gen.throw(typ, value, traceback)
| File "/usr/local/lib/python3.11/site-packages/starlette/_utils.py", line 93, in collapse_excgroups
| raise exc
| File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 261, in wrap
| await func()
| File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in stream_response
| async for chunk in self.body_iterator:
| File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 65, in iterate_in_threadpool
| yield await anyio.to_thread.run_sync(_next, as_iterator)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 56, in run_sync
| return await get_async_backend().run_sync_in_worker_thread(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2476, in run_sync_in_worker_thread
| return await future
| ^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 967, in run
| result = context.run(func, *args)
| ^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 54, in _next
| return next(iterator)
| ^^^^^^^^^^^^^^
| File "/app/main.py", line 705, in stream_content
| for line in res:
| File "/app/./pipelines/google_manifold_pipeline.py", line 196, in stream_response
| for chunk in response:
| File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 6615, in generate_content_stream
| for chunk in response:
| File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 5367, in _generate_content_stream
| for response in self._api_client.request_streamed(
| File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1308, in request_streamed
| session_response = self._request(http_request, http_options, stream=True)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1127, in _request
| return self._retry(self._request_once, http_request, stream) # type: ignore[no-any-return]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 475, in __call__
| do = self.iter(retry_state=retry_state)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
| result = action(retry_state)
| ^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 418, in exc_check
| raise retry_exc.reraise()
| ^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 185, in reraise
| raise self.last_attempt.result()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
| return self.__get_result()
| ^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
| raise self._exception
| File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 478, in __call__
| result = fn(*args, **kwargs)
| ^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1091, in _request_once
| response = self._httpx_client.send(httpx_request, stream=stream)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 901, in send
| raise RuntimeError("Cannot send a request, as the client has been closed.")
| RuntimeError: Cannot send a request, as the client has been closed.
+------------------------------------
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 435, in run_asgi
result = await app( # type: ignore[func-returns-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
return await self.app(scope, receive, send)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
await super().__call__(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
await self.middleware_stack(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
raise exc
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
await self.app(scope, receive, _send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 189, in __call__
with collapse_excgroups():
File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.11/site-packages/starlette/_utils.py", line 93, in collapse_excgroups
raise exc
File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 261, in wrap
await func()
File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in stream_response
async for chunk in self.body_iterator:
File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 65, in iterate_in_threadpool
yield await anyio.to_thread.run_sync(_next, as_iterator)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 56, in run_sync
return await get_async_backend().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2476, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 967, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 54, in _next
return next(iterator)
^^^^^^^^^^^^^^
File "/app/main.py", line 705, in stream_content
for line in res:
File "/app/./pipelines/google_manifold_pipeline.py", line 196, in stream_response
for chunk in response:
File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 6615, in generate_content_stream
for chunk in response:
File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 5367, in _generate_content_stream
for response in self._api_client.request_streamed(
File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1308, in request_streamed
session_response = self._request(http_request, http_options, stream=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1127, in _request
return self._retry(self._request_once, http_request, stream) # type: ignore[no-any-return]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 475, in __call__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
result = action(retry_state)
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 418, in exc_check
raise retry_exc.reraise()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 185, in reraise
raise self.last_attempt.result()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 478, in __call__
result = fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1091, in _request_once
response = self._httpx_client.send(httpx_request, stream=stream)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 901, in send
raise RuntimeError("Cannot send a request, as the client has been closed.")
RuntimeError: Cannot send a request, as the client has been closed.
INFO:root:stream:true:<generator object Pipeline.stream_response at 0x7f77d0e640>
INFO:google_genai.models:AFC is enabled with max remote calls: 10.
ERROR: Exception in ASGI application
+ Exception Group Traceback (most recent call last):
| File "/usr/local/lib/python3.11/site-packages/starlette/_utils.py", line 87, in collapse_excgroups
| yield
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 190, in __call__
| async with anyio.create_task_group() as task_group:
| File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Exception Group Traceback (most recent call last):
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 192, in __call__
| await response(scope, wrapped_receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 258, in __call__
| async with anyio.create_task_group() as task_group:
| File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Exception Group Traceback (most recent call last):
| File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 261, in wrap
| await func()
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 217, in stream_response
| return await super().stream_response(send)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in stream_response
| async for chunk in self.body_iterator:
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 181, in body_stream
| raise app_exc
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 151, in coro
| await self.app(scope, receive_or_disconnect, send_no_error)
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 85, in __call__
| await self.app(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 65, in __call__
| await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
| raise exc
| File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
| await app(scope, receive, sender)
| File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 756, in __call__
| await self.middleware_stack(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 776, in app
| await route.handle(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 297, in handle
| await self.app(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 77, in app
| await wrap_app_handling_exceptions(app, request)(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
| raise exc
| File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
| await app(scope, receive, sender)
| File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 75, in app
| await response(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 258, in __call__
| async with anyio.create_task_group() as task_group:
| File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 435, in run_asgi
| result = await app( # type: ignore[func-returns-value]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
| return await self.app(scope, receive, send)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
| await super().__call__(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
| await self.middleware_stack(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
| raise exc
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
| await self.app(scope, receive, _send)
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 189, in __call__
| with collapse_excgroups():
| File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
| self.gen.throw(typ, value, traceback)
| File "/usr/local/lib/python3.11/site-packages/starlette/_utils.py", line 93, in collapse_excgroups
| raise exc
| File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 261, in wrap
| await func()
| File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in stream_response
| async for chunk in self.body_iterator:
| File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 65, in iterate_in_threadpool
| yield await anyio.to_thread.run_sync(_next, as_iterator)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 56, in run_sync
| return await get_async_backend().run_sync_in_worker_thread(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2476, in run_sync_in_worker_thread
| return await future
| ^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 967, in run
| result = context.run(func, *args)
| ^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 54, in _next
| return next(iterator)
| ^^^^^^^^^^^^^^
| File "/app/main.py", line 705, in stream_content
| for line in res:
| File "/app/./pipelines/google_manifold_pipeline.py", line 196, in stream_response
| for chunk in response:
| File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 6615, in generate_content_stream
| for chunk in response:
| File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 5367, in _generate_content_stream
| for response in self._api_client.request_streamed(
| File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1308, in request_streamed
| session_response = self._request(http_request, http_options, stream=True)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1127, in _request
| return self._retry(self._request_once, http_request, stream) # type: ignore[no-any-return]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 475, in __call__
| do = self.iter(retry_state=retry_state)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
| result = action(retry_state)
| ^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 418, in exc_check
| raise retry_exc.reraise()
| ^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 185, in reraise
| raise self.last_attempt.result()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
| return self.__get_result()
| ^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
| raise self._exception
| File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 478, in __call__
| result = fn(*args, **kwargs)
| ^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1091, in _request_once
| response = self._httpx_client.send(httpx_request, stream=stream)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 901, in send
| raise RuntimeError("Cannot send a request, as the client has been closed.")
| RuntimeError: Cannot send a request, as the client has been closed.
+------------------------------------
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 435, in run_asgi
result = await app( # type: ignore[func-returns-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
return await self.app(scope, receive, send)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
await super().__call__(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
await self.middleware_stack(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
raise exc
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
await self.app(scope, receive, _send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 189, in __call__
with collapse_excgroups():
File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.11/site-packages/starlette/_utils.py", line 93, in collapse_excgroups
raise exc
File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 261, in wrap
await func()
File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in stream_response
async for chunk in self.body_iterator:
File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 65, in iterate_in_threadpool
yield await anyio.to_thread.run_sync(_next, as_iterator)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 56, in run_sync
return await get_async_backend().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2476, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 967, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 54, in _next
return next(iterator)
^^^^^^^^^^^^^^
File "/app/main.py", line 705, in stream_content
for line in res:
File "/app/./pipelines/google_manifold_pipeline.py", line 196, in stream_response
for chunk in response:
File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 6615, in generate_content_stream
for chunk in response:
File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 5367, in _generate_content_stream
for response in self._api_client.request_streamed(
File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1308, in request_streamed
session_response = self._request(http_request, http_options, stream=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1127, in _request
return self._retry(self._request_once, http_request, stream) # type: ignore[no-any-return]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 475, in __call__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
result = action(retry_state)
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 418, in exc_check
raise retry_exc.reraise()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 185, in reraise
raise self.last_attempt.result()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 478, in __call__
result = fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1091, in _request_once
response = self._httpx_client.send(httpx_request, stream=stream)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 901, in send
raise RuntimeError("Cannot send a request, as the client has been closed.")
RuntimeError: Cannot send a request, as the client has been closed.
INFO:root:stream:true:<generator object Pipeline.stream_response at 0x7f77d0f350>
INFO:google_genai.models:AFC is enabled with max remote calls: 10.
ERROR: Exception in ASGI application
+ Exception Group Traceback (most recent call last):
| File "/usr/local/lib/python3.11/site-packages/starlette/_utils.py", line 87, in collapse_excgroups
| yield
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 190, in __call__
| async with anyio.create_task_group() as task_group:
| File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Exception Group Traceback (most recent call last):
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 192, in __call__
| await response(scope, wrapped_receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 258, in __call__
| async with anyio.create_task_group() as task_group:
| File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Exception Group Traceback (most recent call last):
| File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 261, in wrap
| await func()
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 217, in stream_response
| return await super().stream_response(send)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in stream_response
| async for chunk in self.body_iterator:
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 181, in body_stream
| raise app_exc
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 151, in coro
| await self.app(scope, receive_or_disconnect, send_no_error)
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 85, in __call__
| await self.app(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 65, in __call__
| await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
| raise exc
| File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
| await app(scope, receive, sender)
| File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 756, in __call__
| await self.middleware_stack(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 776, in app
| await route.handle(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 297, in handle
| await self.app(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 77, in app
| await wrap_app_handling_exceptions(app, request)(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
| raise exc
| File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
| await app(scope, receive, sender)
| File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 75, in app
| await response(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 258, in __call__
| async with anyio.create_task_group() as task_group:
| File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 435, in run_asgi
| result = await app( # type: ignore[func-returns-value]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
| return await self.app(scope, receive, send)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
| await super().__call__(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
| await self.middleware_stack(scope, receive, send)
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
| raise exc
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
| await self.app(scope, receive, _send)
| File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 189, in __call__
| with collapse_excgroups():
| File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
| self.gen.throw(typ, value, traceback)
| File "/usr/local/lib/python3.11/site-packages/starlette/_utils.py", line 93, in collapse_excgroups
| raise exc
| File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 261, in wrap
| await func()
| File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in stream_response
| async for chunk in self.body_iterator:
| File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 65, in iterate_in_threadpool
| yield await anyio.to_thread.run_sync(_next, as_iterator)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 56, in run_sync
| return await get_async_backend().run_sync_in_worker_thread(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2476, in run_sync_in_worker_thread
| return await future
| ^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 967, in run
| result = context.run(func, *args)
| ^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 54, in _next
| return next(iterator)
| ^^^^^^^^^^^^^^
| File "/app/main.py", line 705, in stream_content
| for line in res:
| File "/app/./pipelines/google_manifold_pipeline.py", line 196, in stream_response
| for chunk in response:
| File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 6615, in generate_content_stream
| for chunk in response:
| File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 5367, in _generate_content_stream
| for response in self._api_client.request_streamed(
| File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1308, in request_streamed
| session_response = self._request(http_request, http_options, stream=True)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1127, in _request
| return self._retry(self._request_once, http_request, stream) # type: ignore[no-any-return]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 475, in __call__
| do = self.iter(retry_state=retry_state)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
| result = action(retry_state)
| ^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 418, in exc_check
| raise retry_exc.reraise()
| ^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 185, in reraise
| raise self.last_attempt.result()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
| return self.__get_result()
| ^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
| raise self._exception
| File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 478, in __call__
| result = fn(*args, **kwargs)
| ^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1091, in _request_once
| response = self._httpx_client.send(httpx_request, stream=stream)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 901, in send
| raise RuntimeError("Cannot send a request, as the client has been closed.")
| RuntimeError: Cannot send a request, as the client has been closed.
+------------------------------------
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 435, in run_asgi
result = await app( # type: ignore[func-returns-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
return await self.app(scope, receive, send)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
await super().__call__(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
await self.middleware_stack(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
raise exc
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
await self.app(scope, receive, _send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 189, in __call__
with collapse_excgroups():
File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.11/site-packages/starlette/_utils.py", line 93, in collapse_excgroups
raise exc
File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 261, in wrap
await func()
File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in stream_response
async for chunk in self.body_iterator:
File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 65, in iterate_in_threadpool
yield await anyio.to_thread.run_sync(_next, as_iterator)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 56, in run_sync
return await get_async_backend().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2476, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 967, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 54, in _next
return next(iterator)
^^^^^^^^^^^^^^
File "/app/main.py", line 705, in stream_content
for line in res:
File "/app/./pipelines/google_manifold_pipeline.py", line 196, in stream_response
for chunk in response:
File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 6615, in generate_content_stream
for chunk in response:
File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 5367, in _generate_content_stream
for response in self._api_client.request_streamed(
File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1308, in request_streamed
session_response = self._request(http_request, http_options, stream=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1127, in _request
return self._retry(self._request_once, http_request, stream) # type: ignore[no-any-return]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 475, in __call__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
result = action(retry_state)
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 418, in exc_check
raise retry_exc.reraise()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 185, in reraise
raise self.last_attempt.result()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 478, in __call__
result = fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1091, in _request_once
response = self._httpx_client.send(httpx_request, stream=stream)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 901, in send
raise RuntimeError("Cannot send a request, as the client has been closed.")
RuntimeError: Cannot send a request, as the client has been closed.
I had the same problem.
I just gave up on the pipeline and instead started using the OpenAI endpoint for Gemini in OpenWebUI connections panel. It works perfect now.
Also make sure to check your Google account between AI studio and cloud console as it's a true clusterf***... My API key, which is part of a project, had no billing setup which has to be configured in cloud console while there was also a discrepancy between the existing projects on Google AI studio and Google cloud console. In all honesty, I barely had used Gemini before.
Here is my recommended order to migrate to the working OpenAI endpoint for Gemini:
- Create a project in cloud console
- Setup billing for this cloud console project
- In AI studio, USE THE IMPORT PROJECT option to get the project which was created in cloud console
- Create an API key in AI studio
- Delete the pipeline in OpenWebUI
- In OpenWebUI create a connection to https://generativelanguage.googleapis.com/v1beta/openai with your key
- Check if your model list has been populated with Googles stuff and test it
- Also check google status as they seem to have had some issues around the time of posting.
I have the same issue.
Got the same problem using official Open WebUI pipeline. Gave up and started using official Gemini API with OpenAI compatibility.