pipelines icon indicating copy to clipboard operation
pipelines copied to clipboard

I keep getting this error `Response payload is not completed: <TransferEncodingError: 400, message='Not enough data to satisfy transfer length header.'>")>`

Open ndrew222 opened this issue 3 months ago • 10 comments

Yeah, I keep getting that error, and it takes a few regenerations. But it doesn't seem to count as an error on the API end. I'm using Google Gemini API with the Gemini Pipeline.

Here's my logs. The weird thing is that the generation will begin to show on the WebUI but the error will be thrown in the logs. Then when I refresh, the generated text that managed to load disappears, and the error message is shown.

2025-09-11 14:01:08.125 | INFO     | uvicorn.protocols.http.httptools_impl:send:476 - 100.78.217.82:0 - "GET /api/v1/chats/?page=1 HTTP/1.1" 200
2025-09-11 14:01:08.329 | INFO     | uvicorn.protocols.http.httptools_impl:send:476 - 100.78.217.82:0 - "GET /api/v1/folders/ HTTP/1.1" 200
2025-09-11 14:01:09.341 | INFO     | uvicorn.protocols.http.httptools_impl:send:476 - 100.78.217.82:0 - "POST /api/chat/completions HTTP/1.1" 200
2025-09-11 14:01:09.519 | INFO     | open_webui.routers.openai:get_all_models:406 - get_all_models()
2025-09-11 14:01:09.894 | INFO     | uvicorn.protocols.http.httptools_impl:send:476 - 100.78.217.82:0 - "GET /api/v1/chats/?page=1 HTTP/1.1" 200
2025-09-11 14:01:10.538 | INFO     | uvicorn.protocols.http.httptools_impl:send:476 - 100.78.217.82:0 - "GET /api/v1/folders/ HTTP/1.1" 200
2025-09-11 14:01:16.293 | ERROR    | open_webui.tasks:cleanup_task:88 - Task exception was never retrieved
future: <Task finished name='Task-2350' coro=<chat_completion.<locals>.process_chat() done, defined at /app/backend/open_webui/main.py:1495> exception=HTTPException(status_code=400, detail="Response payload is not completed: <TransferEncodingError: 400, message='Not enough data to satisfy transfer length header.'>")>
Traceback (most recent call last):

  File "/usr/local/lib/python3.11/site-packages/aiohttp/client_proto.py", line 144, in connection_lost
    uncompleted = self._parser.feed_eof()
                  │    └ None
                  └ <aiohttp.client_proto.ResponseHandler object at 0x7f2ad62d60>
  File "aiohttp/_http_parser.pyx", line 508, in aiohttp._http_parser.HttpParser.feed_eof
    raise TransferEncodingError(
          └ <class 'aiohttp.http_exceptions.TransferEncodingError'>

aiohttp.http_exceptions.TransferEncodingError: 400, message:
  Not enough data to satisfy transfer length header.


The above exception was the direct cause of the following exception:


Traceback (most recent call last):

  File "/app/backend/open_webui/main.py", line 1514, in process_chat
    return await process_chat_response(
                 └ <function process_chat_response at 0x7f7c89f240>

  File "/app/backend/open_webui/utils/middleware.py", line 2636, in process_chat_response
    return await response_handler(response, events)
                 │                │         └ []
                 │                └ <starlette.responses.StreamingResponse object at 0x7f2ad5aa90>
                 └ <function process_chat_response.<locals>.response_handler at 0x7f2adc6200>

  File "/app/backend/open_webui/utils/middleware.py", line 2208, in response_handler
    await stream_body_handler(response, form_data)
          │                   │         └ {'stream': True, 'model': 'google_genai.gemini-2.5-pro-preview-05-06', 'messages': [{'role': 'system', 'content': '**I. Core ...
          │                   └ <starlette.responses.StreamingResponse object at 0x7f2ad5aa90>
          └ <function process_chat_response.<locals>.response_handler.<locals>.stream_body_handler at 0x7f2adc7920>

  File "/app/backend/open_webui/utils/middleware.py", line 1902, in stream_body_handler
    async for line in response.body_iterator:
              │       │        └ <StreamReader e=ClientPayloadError("Response payload is not completed: <TransferEncodingError: 400, message='Not enough data ...
              │       └ <starlette.responses.StreamingResponse object at 0x7f2ad5aa90>
              └ '\n'

  File "/usr/local/lib/python3.11/site-packages/aiohttp/streams.py", line 52, in __anext__
    rv = await self.read_func()
               │    └ <member 'read_func' of 'AsyncStreamIterator' objects>
               └ <aiohttp.streams.AsyncStreamIterator object at 0x7f5a5130a0>
  File "/usr/local/lib/python3.11/site-packages/aiohttp/streams.py", line 352, in readline
    return await self.readuntil()
                 │    └ <function StreamReader.readuntil at 0x7fae8cc040>
                 └ <StreamReader e=ClientPayloadError("Response payload is not completed: <TransferEncodingError: 400, message='Not enough data ...
  File "/usr/local/lib/python3.11/site-packages/aiohttp/streams.py", line 386, in readuntil
    await self._wait("readuntil")
          │    └ <function StreamReader._wait at 0x7fae8b3ec0>
          └ <StreamReader e=ClientPayloadError("Response payload is not completed: <TransferEncodingError: 400, message='Not enough data ...
  File "/usr/local/lib/python3.11/site-packages/aiohttp/streams.py", line 347, in _wait
    await waiter
          └ <Future finished exception=ClientPayloadError("Response payload is not completed: <TransferEncodingError: 400, message='Not e...

aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed: <TransferEncodingError: 400, message='Not enough data to satisfy transfer length header.'>


During handling of the above exception, another exception occurred:


Traceback (most recent call last):

> File "/app/backend/open_webui/main.py", line 1541, in process_chat
    raise HTTPException(
          └ <class 'fastapi.exceptions.HTTPException'>

fastapi.exceptions.HTTPException: 400: Response payload is not completed: <TransferEncodingError: 400, message='Not enough data to satisfy transfer length header.'>
2025-09-11 14:01:53.119 | INFO     | uvicorn.protocols.http.httptools_impl:send:476 - 100.78.217.82:0 - "GET /_app/version.json HTTP/1.1" 304

ndrew222 avatar Sep 11 '25 14:09 ndrew222

@ndrew222 Can you share your pipeline code to analyze the cause of the error?

kim-seokjin avatar Sep 18 '25 06:09 kim-seokjin

Oh yeah. Sorry.

"""
title: Google Gemini Pipeline
author: owndev, olivier-lacroix
author_url: https://github.com/owndev/
project_url: https://github.com/owndev/Open-WebUI-Functions
funding_url: https://github.com/sponsors/owndev
version: 1.3.3
license: Apache License 2.0
description: A manifold pipeline for interacting with Google Gemini models, including dynamic model specification, streaming responses, and flexible error handling.
features:
  - Asynchronous API calls for better performance
  - Model caching to reduce API calls
  - Dynamic model specification with automatic prefix stripping
  - Streaming response handling with safety checks
  - Support for multimodal input (text and images)
  - Flexible error handling and logging
  - Integration with Google Generative AI or Vertex AI API for content generation
  - Support for various generation parameters (temperature, max tokens, etc.)
  - Customizable safety settings based on environment variables
  - Encrypted storage of sensitive API keys
  - Grounding with Google search
  - Native tool calling support
"""

import os
import inspect
from functools import update_wrapper
import re
import time
import asyncio
import base64
import hashlib
import logging
from google import genai
from google.genai import types
from google.genai.errors import ClientError, ServerError, APIError
from typing import List, Union, Optional, Dict, Any, Tuple, AsyncIterator, Callable
from pydantic_core import core_schema
from pydantic import BaseModel, Field, GetCoreSchemaHandler
from cryptography.fernet import Fernet, InvalidToken
from open_webui.env import SRC_LOG_LEVELS


# Simplified encryption implementation with automatic handling


class EncryptedStr(str):
    """A string type that automatically handles encryption/decryption"""

    @classmethod
    def _get_encryption_key(cls) -> Optional[bytes]:
        """
        Generate encryption key from WEBUI_SECRET_KEY if available
        Returns None if no key is configured
        """
        secret = os.getenv("WEBUI_SECRET_KEY")
        if not secret:
            return None
        hashed_key = hashlib.sha256(secret.encode()).digest()
        return base64.urlsafe_b64encode(hashed_key)

    @classmethod
    def encrypt(cls, value: str) -> str:
        """
        Encrypt a string value if a key is available
        Returns the original value if no key is available
        """
        if not value or value.startswith("encrypted:"):
            return value
        key = cls._get_encryption_key()
        if not key:  # No encryption if no key
            return value
        f = Fernet(key)
        encrypted = f.encrypt(value.encode())
        return f"encrypted:{encrypted.decode()}"

    @classmethod
    def decrypt(cls, value: str) -> str:
        """
        Decrypt an encrypted string value if a key is available
        Returns the original value if no key is available or decryption fails
        """
        if not value or not value.startswith("encrypted:"):
            return value
        key = cls._get_encryption_key()
        if not key:  # No decryption if no key
            return value[len("encrypted:") :]  # Return without prefix
        try:
            encrypted_part = value[len("encrypted:") :]
            f = Fernet(key)
            decrypted = f.decrypt(encrypted_part.encode())
            return decrypted.decode()
        except (InvalidToken, Exception):
            return value

    # Pydantic integration

    @classmethod
    def __get_pydantic_core_schema__(
        cls, _source_type: Any, _handler: GetCoreSchemaHandler
    ) -> core_schema.CoreSchema:
        return core_schema.union_schema(
            [
                core_schema.is_instance_schema(cls),
                core_schema.chain_schema(
                    [
                        core_schema.str_schema(),
                        core_schema.no_info_plain_validator_function(
                            lambda value: cls(cls.encrypt(value) if value else value)
                        ),
                    ]
                ),
            ],
            serialization=core_schema.plain_serializer_function_ser_schema(
                lambda instance: str(instance)
            ),
        )

    def get_decrypted(self) -> str:
        """Get the decrypted value"""
        return self.decrypt(self)


class Pipe:
    """
    Pipeline for interacting with Google Gemini models.
    """

    # Configuration valves for the pipeline

    class Valves(BaseModel):
        GOOGLE_API_KEY: EncryptedStr = Field(
            default=os.getenv("GOOGLE_API_KEY", ""),
            description="API key for Google Generative AI (used if USE_VERTEX_AI is false).",
        )
        USE_VERTEX_AI: bool = Field(
            default=os.getenv("GOOGLE_GENAI_USE_VERTEXAI", "false").lower() == "true",
            description="Whether to use Google Cloud Vertex AI instead of the Google Generative AI API.",
        )
        VERTEX_PROJECT: str | None = Field(
            default=os.getenv("GOOGLE_CLOUD_PROJECT"),
            description="The Google Cloud project ID to use with Vertex AI.",
        )
        VERTEX_LOCATION: str = Field(
            default=os.getenv("GOOGLE_CLOUD_LOCATION", "global"),
            description="The Google Cloud region to use with Vertex AI.",
        )
        USE_PERMISSIVE_SAFETY: bool = Field(
            default=os.getenv("USE_PERMISSIVE_SAFETY", "false").lower() == "true",
            description="Use permissive safety settings for content generation.",
        )
        MODEL_CACHE_TTL: int = Field(
            default=int(os.getenv("GOOGLE_MODEL_CACHE_TTL", "600")),
            description="Time in seconds to cache the model list before refreshing",
        )
        RETRY_COUNT: int = Field(
            default=int(os.getenv("GOOGLE_RETRY_COUNT", "2")),
            description="Number of times to retry API calls on temporary failures",
        )

    def __init__(self):
        """Initializes the Pipe instance and configures the genai library."""
        self.valves = self.Valves()
        self.name: str = "Google Gemini: "

        # Setup logging

        self.log = logging.getLogger("google_ai.pipe")
        self.log.setLevel(SRC_LOG_LEVELS.get("OPENAI", logging.INFO))

        # Model cache

        self._model_cache: Optional[List[Dict[str, str]]] = None
        self._model_cache_time: float = 0

    def _get_client(self) -> genai.Client:
        """
        Validates API credentials and returns a genai.Client instance.
        """
        self._validate_api_key()

        if self.valves.USE_VERTEX_AI:
            self.log.debug(
                f"Initializing Vertex AI client (Project: {self.valves.VERTEX_PROJECT}, Location: {self.valves.VERTEX_LOCATION})"
            )
            return genai.Client(
                vertexai=True,
                project=self.valves.VERTEX_PROJECT,
                location=self.valves.VERTEX_LOCATION,
            )
        else:
            self.log.debug("Initializing Google Generative AI client with API Key")
            return genai.Client(api_key=self.valves.GOOGLE_API_KEY.get_decrypted())

    def _validate_api_key(self) -> None:
        """
        Validates that the necessary Google API credentials are set.

        Raises:
            ValueError: If the required credentials are not set.
        """
        if self.valves.USE_VERTEX_AI:
            if not self.valves.VERTEX_PROJECT:
                self.log.error("USE_VERTEX_AI is true, but VERTEX_PROJECT is not set.")
                raise ValueError(
                    "VERTEX_PROJECT is not set. Please provide the Google Cloud project ID."
                )
            # For Vertex AI, location has a default, so project is the main thing to check.
            # Actual authentication will be handled by ADC or environment.

            self.log.debug(
                "Using Vertex AI. Ensure ADC or service account is configured."
            )
        else:
            if not self.valves.GOOGLE_API_KEY:
                self.log.error("GOOGLE_API_KEY is not set (and not using Vertex AI).")
                raise ValueError(
                    "GOOGLE_API_KEY is not set. Please provide the API key in the environment variables or valves."
                )
            self.log.debug("Using Google Generative AI API with API Key.")

    def strip_prefix(self, model_name: str) -> str:
        """
        Extract the model identifier using regex, handling various naming conventions.
        e.g., "google_gemini_pipeline.gemini-2.5-flash-preview-04-17" -> "gemini-2.5-flash-preview-04-17"
        e.g., "models/gemini-1.5-flash-001" -> "gemini-1.5-flash-001"
        e.g., "publishers/google/models/gemini-1.5-pro" -> "gemini-1.5-pro"
        """
        # Use regex to remove everything up to and including the last '/' or the first '.'

        stripped = re.sub(r"^(?:.*/|[^.]*\.)", "", model_name)
        return stripped

    def get_google_models(self, force_refresh: bool = False) -> List[Dict[str, str]]:
        """
        Retrieve available Google models suitable for content generation.
        Uses caching to reduce API calls.

        Args:
            force_refresh: Whether to force refreshing the model cache

        Returns:
            List of dictionaries containing model id and name.
        """
        # Check cache first

        current_time = time.time()
        if (
            not force_refresh
            and self._model_cache is not None
            and (current_time - self._model_cache_time) < self.valves.MODEL_CACHE_TTL
        ):
            self.log.debug("Using cached model list")
            return self._model_cache
        try:
            client = self._get_client()
            self.log.debug("Fetching models from Google API")
            models = client.models.list()
            available_models = []
            for model in models:
                actions = model.supported_actions
                if actions is None or "generateContent" in actions:
                    available_models.append(
                        {
                            "id": self.strip_prefix(model.name),
                            "name": model.display_name or self.strip_prefix(model.name),
                        }
                    )
            model_map = {model["id"]: model for model in available_models}

            # Filter map to only include models starting with 'gemini-'

            filtered_models = {
                k: v for k, v in model_map.items() if k.startswith("gemini-")
            }

            # Update cache

            self._model_cache = list(filtered_models.values())
            self._model_cache_time = current_time
            self.log.debug(f"Found {len(self._model_cache)} Gemini models")
            return self._model_cache
        except Exception as e:
            self.log.exception(f"Could not fetch models from Google: {str(e)}")
            # Return a specific error entry for the UI

            return [{"id": "error", "name": f"Could not fetch models: {str(e)}"}]

    def pipes(self) -> List[Dict[str, str]]:
        """
        Returns a list of available Google Gemini models for the UI.

        Returns:
            List of dictionaries containing model id and name.
        """
        try:
            self.name = "Google Gemini: "
            return self.get_google_models()
        except ValueError as e:
            # Handle the case where API key is missing during pipe listing

            self.log.error(f"Error during pipes listing (validation): {e}")
            return [{"id": "error", "name": str(e)}]
        except Exception as e:
            # Handle other potential errors during model fetching

            self.log.exception(
                f"An unexpected error occurred during pipes listing: {str(e)}"
            )
            return [{"id": "error", "name": f"An unexpected error occurred: {str(e)}"}]

    def _prepare_model_id(self, model_id: str) -> str:
        """
        Prepare and validate the model ID for use with the API.

        Args:
            model_id: The original model ID from the user

        Returns:
            Properly formatted model ID

        Raises:
            ValueError: If the model ID is invalid or unsupported
        """
        original_model_id = model_id
        model_id = self.strip_prefix(model_id)

        # If the model ID doesn't look like a Gemini model, try to find it by name

        if not model_id.startswith("gemini-"):
            models_list = self.get_google_models()
            found_model = next(
                (m["id"] for m in models_list if m["name"] == original_model_id), None
            )
            if found_model and found_model.startswith("gemini-"):
                model_id = found_model
                self.log.debug(
                    f"Mapped model name '{original_model_id}' to model ID '{model_id}'"
                )
            else:
                # If we still don't have a valid ID, raise an error

                if not model_id.startswith("gemini-"):
                    self.log.error(
                        f"Invalid or unsupported model ID: '{original_model_id}'"
                    )
                    raise ValueError(
                        f"Invalid or unsupported Google model ID or name: '{original_model_id}'"
                    )
        return model_id

    def _prepare_content(
        self, messages: List[Dict[str, Any]]
    ) -> Tuple[List[Dict[str, Any]], Optional[str]]:
        """
        Prepare messages content for the API and extract system message if present.

        Args:
            messages: List of message objects from the request

        Returns:
            Tuple of (prepared content list, system message string or None)
        """
        # Extract system message

        system_message = next(
            (msg["content"] for msg in messages if msg.get("role") == "system"),
            None,
        )

        # Prepare contents for the API

        contents = []
        for message in messages:
            role = message.get("role")
            if role == "system":
                continue  # Skip system messages, handled separately
            content = message.get("content", "")
            parts = []

            # Handle different content types

            if isinstance(content, list):  # Multimodal content
                parts.extend(self._process_multimodal_content(content))
            elif isinstance(content, str):  # Plain text content
                parts.append({"text": content})
            else:
                self.log.warning(f"Unsupported message content type: {type(content)}")
                continue  # Skip unsupported content
            # Map roles: 'assistant' -> 'model', 'user' -> 'user'

            api_role = "model" if role == "assistant" else "user"
            if parts:  # Only add if there are parts
                contents.append({"role": api_role, "parts": parts})
        return contents, system_message

    def _process_multimodal_content(
        self, content_list: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        """
        Process multimodal content (text and images).

        Args:
            content_list: List of content items

        Returns:
            List of processed parts for the Gemini API
        """
        parts = []

        for item in content_list:
            if item.get("type") == "text":
                parts.append({"text": item.get("text", "")})
            elif item.get("type") == "image_url":
                image_url = item.get("image_url", {}).get("url", "")

                if image_url.startswith("data:image"):
                    # Handle base64 encoded image data

                    try:
                        header, encoded = image_url.split(",", 1)
                        mime_type = header.split(":")[1].split(";")[0]

                        # Basic validation for image types

                        if mime_type not in [
                            "image/jpeg",
                            "image/png",
                            "image/webp",
                            "image/heic",
                            "image/heif",
                        ]:
                            self.log.warning(
                                f"Unsupported image mime type: {mime_type}"
                            )
                            parts.append(
                                {"text": f"[Image type {mime_type} not supported]"}
                            )
                            continue
                        parts.append(
                            {
                                "inline_data": {
                                    "mime_type": mime_type,
                                    "data": encoded,
                                }
                            }
                        )
                    except Exception as img_ex:
                        self.log.exception(f"Could not parse image data URL: {img_ex}")
                        parts.append({"text": "[Image data could not be processed]"})
                else:
                    # Gemini API doesn't directly support image URLs

                    self.log.warning(f"Direct image URLs not supported: {image_url}")
                    parts.append({"text": f"[Image URL not processed: {image_url}]"})
        return parts

    @staticmethod
    def _create_tool(tool_def):
        """OpenwebUI tool is a functools.partial coroutine, which genai does not support directly.
        See https://github.com/googleapis/python-genai/issues/907

        This function wraps the tool into a callable that can be used with genai.
        In particular, it sets the signature of the function properly,
        removing any frozen keyword arguments (extra_params).
        """
        bound_callable = tool_def["callable"]

        # Create a wrapper for bound_callable, which is always async

        async def wrapper(*args, **kwargs):
            return await bound_callable(*args, **kwargs)

        # Remove 'frozen' keyword arguments (extra_params) from the signature

        original_sig = inspect.signature(bound_callable)
        frozen_kwargs = {
            "__event_emitter__",
            "__event_call__",
            "__user__",
            "__metadata__",
            "__request__",
            "__model__",
        }
        new_parameters = []

        for name, parameter in original_sig.parameters.items():
            # Exclude keyword arguments that are frozen

            if name in frozen_kwargs and parameter.kind in (
                inspect.Parameter.POSITIONAL_OR_KEYWORD,
                inspect.Parameter.KEYWORD_ONLY,
            ):
                continue
            # Keep remaining parameters

            new_parameters.append(parameter)
        new_sig = inspect.Signature(
            parameters=new_parameters, return_annotation=original_sig.return_annotation
        )

        # Ensure name, docstring and signature are properly set

        update_wrapper(wrapper, bound_callable)
        wrapper.__signature__ = new_sig

        return wrapper

    def _configure_generation(
        self,
        body: Dict[str, Any],
        system_instruction: Optional[str],
        __metadata__: Dict[str, Any],
        __tools__: dict[str, Any] | None = None,
    ) -> types.GenerateContentConfig:
        """
        Configure generation parameters and safety settings.

        Args:
            body: The request body containing generation parameters
            system_instruction: Optional system instruction string

        Returns:
            types.GenerateContentConfig
        """
        gen_config_params = {
            "temperature": body.get("temperature"),
            "top_p": body.get("top_p"),
            "top_k": body.get("top_k"),
            "max_output_tokens": body.get("max_tokens"),
            "stop_sequences": body.get("stop") or None,
            "system_instruction": system_instruction,
        }
        # Configure safety settings

        if self.valves.USE_PERMISSIVE_SAFETY:
            safety_settings = [
                types.SafetySetting(
                    category="HARM_CATEGORY_HARASSMENT", threshold="BLOCK_NONE"
                ),
                types.SafetySetting(
                    category="HARM_CATEGORY_HATE_SPEECH", threshold="BLOCK_NONE"
                ),
                types.SafetySetting(
                    category="HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold="BLOCK_NONE"
                ),
                types.SafetySetting(
                    category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="BLOCK_NONE"
                ),
            ]
            gen_config_params |= {"safety_settings": safety_settings}
        features = __metadata__.get("features", {})
        if features.get("google_search_tool", False):
            self.log.debug("Enabling Google search grounding")
            gen_config_params.setdefault("tools", []).append(
                types.Tool(google_search=types.GoogleSearch())
            )
        if __tools__ is not None and __metadata__.get("function_calling") == "native":
            for name, tool_def in __tools__.items():
                tool = self._create_tool(tool_def)
                self.log.debug(
                    f"Adding tool '{name}' with signature {tool.__signature__}"
                )

                gen_config_params.setdefault("tools", []).append(tool)
        # Filter out None values for generation config

        filtered_params = {k: v for k, v in gen_config_params.items() if v is not None}
        return types.GenerateContentConfig(**filtered_params)

    @staticmethod
    def _format_grounding_chunks_as_sources(
        grounding_chunks: list[types.GroundingChunk],
    ):
        formatted_sources = []
        for chunk in grounding_chunks:
            context = chunk.web or chunk.retrieved_context
            if not context:
                continue
            uri = context.uri
            title = context.title or "Source"

            formatted_sources.append(
                {
                    "source": {
                        "name": title,
                        "type": "web_search_results",
                        "url": uri,
                    },
                    "document": ["Click the link to view the content."],
                    "metadata": [{"source": title}],
                }
            )
        return formatted_sources

    async def _process_grounding_metadata(
        self,
        grounding_metadata_list: List[types.GroundingMetadata],
        text: str,
        __event_emitter__: Callable,
    ):
        """Process and emit grounding metadata events."""
        grounding_chunks = []
        web_search_queries = []
        grounding_supports = []

        for metadata in grounding_metadata_list:
            if metadata.grounding_chunks:
                grounding_chunks.extend(metadata.grounding_chunks)
            if metadata.web_search_queries:
                web_search_queries.extend(metadata.web_search_queries)
            if metadata.grounding_supports:
                grounding_supports.extend(metadata.grounding_supports)
        # Add sources to the response

        if grounding_chunks:
            sources = self._format_grounding_chunks_as_sources(grounding_chunks)
            await __event_emitter__(
                {"type": "chat:completion", "data": {"sources": sources}}
            )
        # Add status specifying google queries used for grounding

        if web_search_queries:
            await __event_emitter__(
                {
                    "type": "status",
                    "data": {
                        "action": "web_search",
                        "description": "This response was grounded with Google Search",
                        "urls": [
                            f"https://www.google.com/search?q={query}"
                            for query in web_search_queries
                        ],
                    },
                }
            )
        # Add citations in the text body

        if grounding_supports:
            # Citation indexes are in bytes

            ENCODING = "utf-8"
            text_bytes = text.encode(ENCODING)
            last_byte_index = 0
            cited_chunks = []

            for support in grounding_supports:
                cited_chunks.append(
                    text_bytes[last_byte_index : support.segment.end_index].decode(
                        ENCODING
                    )
                )

                # Generate and append citations (e.g., "[1][2]")

                footnotes = "".join(
                    [f"[{i + 1}]" for i in support.grounding_chunk_indices]
                )
                cited_chunks.append(f" {footnotes}")

                # Update index for the next segment

                last_byte_index = support.segment.end_index
            # Append any remaining text after the last citation

            if last_byte_index < len(text_bytes):
                cited_chunks.append(text_bytes[last_byte_index:].decode(ENCODING))
            await __event_emitter__(
                {
                    "type": "replace",
                    "data": {"content": "".join(cited_chunks)},
                }
            )

    async def _handle_streaming_response(
        self,
        response_iterator: Any,
        __event_emitter__: Callable,
    ) -> AsyncIterator[str]:
        """
        Handle streaming response from Gemini API.

        Args:
            response_iterator: Iterator from generate_content

        Returns:
            Generator yielding text chunks
        """
        grounding_metadata_list = []
        text_chunks = []
        try:
            async for chunk in response_iterator:
                # Check for safety feedback or empty chunks

                if not chunk.candidates:
                    # Check prompt feedback

                    if (
                        response_iterator.prompt_feedback
                        and response_iterator.prompt_feedback.block_reason
                    ):
                        yield f"[Blocked due to Prompt Safety: {response_iterator.prompt_feedback.block_reason.name}]"
                    else:
                        yield "[Blocked by safety settings]"
                    return  # Stop generation
                if chunk.candidates[0].grounding_metadata:
                    grounding_metadata_list.append(
                        chunk.candidates[0].grounding_metadata
                    )
                if chunk.text:
                    text_chunks.append(chunk.text)
                    await __event_emitter__(
                        {
                            "type": "chat:message:delta",
                            "data": {
                                "content": chunk.text,
                            },
                        }
                    )
            # After processing all chunks, handle grounding data

            if grounding_metadata_list and __event_emitter__:
                await self._process_grounding_metadata(
                    grounding_metadata_list, "".join(text_chunks), __event_emitter__
                )
        except Exception as e:
            self.log.exception(f"Error during streaming: {e}")
            yield f"Error during streaming: {e}"

    def _handle_standard_response(self, response: Any) -> str:
        """
        Handle non-streaming response from Gemini API.

        Args:
            response: Response from generate_content

        Returns:
            Generated text or error message
        """
        # Check for prompt safety blocks

        if response.prompt_feedback and response.prompt_feedback.block_reason:
            return f"[Blocked due to Prompt Safety: {response.prompt_feedback.block_reason.name}]"
        # Check for missing candidates

        if not response.candidates:
            return "[Blocked by safety settings or no candidates generated]"
        # Check candidate finish reason

        candidate = response.candidates[0]
        if candidate.finish_reason == types.FinishReason.SAFETY:
            # Try to get specific safety rating info

            blocking_rating = next(
                (r for r in candidate.safety_ratings if r.blocked), None
            )
            reason = f" ({blocking_rating.category.name})" if blocking_rating else ""
            return f"[Blocked by safety settings{reason}]"
        # Process content parts

        if candidate.content and candidate.content.parts:
            # Combine text from all parts

            return "".join(
                part.text for part in candidate.content.parts if hasattr(part, "text")
            )
        else:
            return "[No content generated or unexpected response structure]"

    async def _retry_with_backoff(self, func, *args, **kwargs) -> Any:
        """
        Retry a function with exponential backoff.

        Args:
            func: Async function to retry
            *args, **kwargs: Arguments to pass to the function

        Returns:
            Result from the function

        Raises:
            The last exception encountered after all retries
        """
        max_retries = self.valves.RETRY_COUNT
        retry_count = 0
        last_exception = None

        while retry_count <= max_retries:
            try:
                return await func(*args, **kwargs)
            except ServerError as e:
                # These errors might be temporary, so retry

                retry_count += 1
                last_exception = e

                if retry_count <= max_retries:
                    # Calculate backoff time (exponential with jitter)

                    wait_time = min(2**retry_count + (0.1 * retry_count), 10)
                    self.log.warning(
                        f"Temporary error from Google API: {e}. Retrying in {wait_time:.1f}s ({retry_count}/{max_retries})"
                    )
                    await asyncio.sleep(wait_time)
                else:
                    raise
            except Exception:
                # Don't retry other exceptions

                raise
        # If we get here, we've exhausted retries

        assert last_exception is not None
        raise last_exception

    async def pipe(
        self,
        body: Dict[str, Any],
        __metadata__: dict[str, Any],
        __event_emitter__: Callable,
        __tools__: dict[str, Any] | None,
    ) -> Union[str, AsyncIterator[str]]:
        """
        Main method for sending requests to the Google Gemini endpoint.

        Args:
            body: The request body containing messages and other parameters.

        Returns:
            Response from Google Gemini API, which could be a string or an iterator for streaming.
        """
        # Setup logging for this request

        request_id = id(body)
        self.log.debug(f"Processing request {request_id}")

        try:
            # Parse and validate model ID

            model_id = body.get("model", "")
            try:
                model_id = self._prepare_model_id(model_id)
                self.log.debug(f"Using model: {model_id}")
            except ValueError as ve:
                return f"Model Error: {ve}"
            # Get stream flag

            stream = body.get("stream", False)
            messages = body.get("messages", [])

            # Prepare content and extract system message

            contents, system_instruction = self._prepare_content(messages)
            if not contents:
                return "Error: No valid message content found"
            # Configure generation parameters and safety settings

            generation_config = self._configure_generation(
                body, system_instruction, __metadata__, __tools__
            )

            # Make the API call

            client = self._get_client()
            if stream:
                try:

                    async def get_streaming_response():
                        return await client.aio.models.generate_content_stream(
                            model=model_id,
                            contents=contents,
                            config=generation_config,
                        )

                    response_iterator = await self._retry_with_backoff(
                        get_streaming_response
                    )
                    self.log.debug(f"Request {request_id}: Got streaming response")
                    return self._handle_streaming_response(
                        response_iterator, __event_emitter__
                    )
                except Exception as e:
                    self.log.exception(f"Error in streaming request {request_id}: {e}")
                    return f"Error during streaming: {e}"
            else:
                try:

                    async def get_response():
                        return await client.aio.models.generate_content(
                            model=model_id,
                            contents=contents,
                            config=generation_config,
                        )

                    response = await self._retry_with_backoff(get_response)
                    self.log.debug(f"Request {request_id}: Got non-streaming response")
                    return self._handle_standard_response(response)
                except Exception as e:
                    self.log.exception(
                        f"Error in non-streaming request {request_id}: {e}"
                    )
                    return f"Error generating content: {e}"
        except ClientError as ce:
            error_msg = f"Client error raised by the GenAI API: {ce}."
            self.log.error(f"Client error: {ce}")
            return error_msg
        except ServerError as se:
            error_msg = f"Server error raised by the GenAI API: {se}"
            self.log.error(f"Server error raised by the GenAI API.: {se}")
            return error_msg
        except APIError as apie:
            error_msg = f"Google API Error: {apie}"
            self.log.error(error_msg)
            return error_msg
        except ValueError as ve:
            error_msg = f"Configuration error: {ve}"
            self.log.error(f"Value error: {ve}")
            return error_msg
        except Exception as e:
            # Log the full error with traceback

            import traceback

            error_trace = traceback.format_exc()
            self.log.exception(f"Unexpected error: {e}\n{error_trace}")

            # Return a user-friendly error message

            return f"An error occurred while processing your request: {e}"

ndrew222 avatar Sep 18 '25 06:09 ndrew222

@ndrew222 What you uploaded looks like the error log of open-webui. Did any errors occur in pipelines? If so, can you show me the error log?

kim-seokjin avatar Sep 18 '25 07:09 kim-seokjin

I'm not sure how to get the pipeline error log. Is it from docker log

ndrew222 avatar Sep 19 '25 15:09 ndrew222

@ndrew222 Sorry for replying late. When you run in a Docker environment, don't you usually run both the open-webui and pipelines containers? What I meant was that showing both containers' docker logs would be helpful for error analysis, so please share!

kim-seokjin avatar Sep 23 '25 23:09 kim-seokjin

Ah, I see. I'm currently on vacation, so haven't touched my Open WebUI instance in a while. I'll get back to you when I encounter the error again.

ndrew222 avatar Sep 26 '25 20:09 ndrew222

Managed to get the error again.


INFO:     Started server process [7]
INFO:     Waiting for application startup.
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
ollama 0.3.3 requires httpx<0.28.0,>=0.27.0, but you have httpx 0.28.1 which is incompatible.
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv

[notice] A new release of pip is available: 24.0 -> 25.2
[notice] To update, run: pip install --upgrade pip
INFO:root:Updated valves for module: google_manifold_pipeline
INFO:root:Loaded module: google_manifold_pipeline
INFO:httpx:HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/models "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: GET https://generativelanguage.googleapis.com/v1beta/models?pageToken=Ch5tb2RlbHMvaW1hZ2VuLTQuMC1nZW5lcmF0ZS0wMDE%3D "HTTP/1.1 200 OK"
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:9099 (Press CTRL+C to quit)
INFO:root:stream:true:<generator object Pipeline.stream_response at 0x7f77d0cd50>
INFO:google_genai.models:AFC is enabled with max remote calls: 10.
ERROR:    Exception in ASGI application
  + Exception Group Traceback (most recent call last):
  |   File "/usr/local/lib/python3.11/site-packages/starlette/_utils.py", line 87, in collapse_excgroups
  |     yield
  |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 190, in __call__
  |     async with anyio.create_task_group() as task_group:
  |   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
  |     raise BaseExceptionGroup(
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Exception Group Traceback (most recent call last):
    |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 192, in __call__
    |     await response(scope, wrapped_receive, send)
    |   File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 258, in __call__
    |     async with anyio.create_task_group() as task_group:
    |   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
    |     raise BaseExceptionGroup(
    | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
    +-+---------------- 1 ----------------
      | Exception Group Traceback (most recent call last):
      |   File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 261, in wrap
      |     await func()
      |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 217, in stream_response
      |     return await super().stream_response(send)
      |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in stream_response
      |     async for chunk in self.body_iterator:
      |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 181, in body_stream
      |     raise app_exc
      |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 151, in coro
      |     await self.app(scope, receive_or_disconnect, send_no_error)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 85, in __call__
      |     await self.app(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 65, in __call__
      |     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
      |     raise exc
      |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
      |     await app(scope, receive, sender)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 756, in __call__
      |     await self.middleware_stack(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 776, in app
      |     await route.handle(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 297, in handle
      |     await self.app(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 77, in app
      |     await wrap_app_handling_exceptions(app, request)(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
      |     raise exc
      |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
      |     await app(scope, receive, sender)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 75, in app
      |     await response(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 258, in __call__
      |     async with anyio.create_task_group() as task_group:
      |   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
      |     raise BaseExceptionGroup(
      | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
      +-+---------------- 1 ----------------
        | Traceback (most recent call last):
        |   File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 435, in run_asgi
        |     result = await app(  # type: ignore[func-returns-value]
        |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
        |     return await self.app(scope, receive, send)
        |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
        |     await super().__call__(scope, receive, send)
        |   File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
        |     await self.middleware_stack(scope, receive, send)
        |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
        |     raise exc
        |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
        |     await self.app(scope, receive, _send)
        |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 189, in __call__
        |     with collapse_excgroups():
        |   File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
        |     self.gen.throw(typ, value, traceback)
        |   File "/usr/local/lib/python3.11/site-packages/starlette/_utils.py", line 93, in collapse_excgroups
        |     raise exc
        |   File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 261, in wrap
        |     await func()
        |   File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in stream_response
        |     async for chunk in self.body_iterator:
        |   File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 65, in iterate_in_threadpool
        |     yield await anyio.to_thread.run_sync(_next, as_iterator)
        |           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 56, in run_sync
        |     return await get_async_backend().run_sync_in_worker_thread(
        |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2476, in run_sync_in_worker_thread
        |     return await future
        |            ^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 967, in run
        |     result = context.run(func, *args)
        |              ^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 54, in _next
        |     return next(iterator)
        |            ^^^^^^^^^^^^^^
        |   File "/app/main.py", line 705, in stream_content
        |     for line in res:
        |   File "/app/./pipelines/google_manifold_pipeline.py", line 196, in stream_response
        |     for chunk in response:
        |   File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 6615, in generate_content_stream
        |     for chunk in response:
        |   File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 5367, in _generate_content_stream
        |     for response in self._api_client.request_streamed(
        |   File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1308, in request_streamed
        |     session_response = self._request(http_request, http_options, stream=True)
        |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1127, in _request
        |     return self._retry(self._request_once, http_request, stream)  # type: ignore[no-any-return]
        |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 475, in __call__
        |     do = self.iter(retry_state=retry_state)
        |          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
        |     result = action(retry_state)
        |              ^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 418, in exc_check
        |     raise retry_exc.reraise()
        |           ^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 185, in reraise
        |     raise self.last_attempt.result()
        |           ^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
        |     return self.__get_result()
        |            ^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
        |     raise self._exception
        |   File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 478, in __call__
        |     result = fn(*args, **kwargs)
        |              ^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1091, in _request_once
        |     response = self._httpx_client.send(httpx_request, stream=stream)
        |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 901, in send
        |     raise RuntimeError("Cannot send a request, as the client has been closed.")
        | RuntimeError: Cannot send a request, as the client has been closed.
        +------------------------------------

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 435, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
    raise exc
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 189, in __call__
    with collapse_excgroups():
  File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.11/site-packages/starlette/_utils.py", line 93, in collapse_excgroups
    raise exc
  File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 261, in wrap
    await func()
  File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in stream_response
    async for chunk in self.body_iterator:
  File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 65, in iterate_in_threadpool
    yield await anyio.to_thread.run_sync(_next, as_iterator)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2476, in run_sync_in_worker_thread
    return await future
           ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 967, in run
    result = context.run(func, *args)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 54, in _next
    return next(iterator)
           ^^^^^^^^^^^^^^
  File "/app/main.py", line 705, in stream_content
    for line in res:
  File "/app/./pipelines/google_manifold_pipeline.py", line 196, in stream_response
    for chunk in response:
  File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 6615, in generate_content_stream
    for chunk in response:
  File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 5367, in _generate_content_stream
    for response in self._api_client.request_streamed(
  File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1308, in request_streamed
    session_response = self._request(http_request, http_options, stream=True)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1127, in _request
    return self._retry(self._request_once, http_request, stream)  # type: ignore[no-any-return]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 475, in __call__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 418, in exc_check
    raise retry_exc.reraise()
          ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 185, in reraise
    raise self.last_attempt.result()
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 478, in __call__
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1091, in _request_once
    response = self._httpx_client.send(httpx_request, stream=stream)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 901, in send
    raise RuntimeError("Cannot send a request, as the client has been closed.")
RuntimeError: Cannot send a request, as the client has been closed.
INFO:root:stream:true:<generator object Pipeline.stream_response at 0x7f77d0e640>
INFO:google_genai.models:AFC is enabled with max remote calls: 10.
ERROR:    Exception in ASGI application
  + Exception Group Traceback (most recent call last):
  |   File "/usr/local/lib/python3.11/site-packages/starlette/_utils.py", line 87, in collapse_excgroups
  |     yield
  |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 190, in __call__
  |     async with anyio.create_task_group() as task_group:
  |   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
  |     raise BaseExceptionGroup(
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Exception Group Traceback (most recent call last):
    |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 192, in __call__
    |     await response(scope, wrapped_receive, send)
    |   File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 258, in __call__
    |     async with anyio.create_task_group() as task_group:
    |   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
    |     raise BaseExceptionGroup(
    | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
    +-+---------------- 1 ----------------
      | Exception Group Traceback (most recent call last):
      |   File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 261, in wrap
      |     await func()
      |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 217, in stream_response
      |     return await super().stream_response(send)
      |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in stream_response
      |     async for chunk in self.body_iterator:
      |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 181, in body_stream
      |     raise app_exc
      |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 151, in coro
      |     await self.app(scope, receive_or_disconnect, send_no_error)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 85, in __call__
      |     await self.app(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 65, in __call__
      |     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
      |     raise exc
      |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
      |     await app(scope, receive, sender)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 756, in __call__
      |     await self.middleware_stack(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 776, in app
      |     await route.handle(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 297, in handle
      |     await self.app(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 77, in app
      |     await wrap_app_handling_exceptions(app, request)(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
      |     raise exc
      |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
      |     await app(scope, receive, sender)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 75, in app
      |     await response(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 258, in __call__
      |     async with anyio.create_task_group() as task_group:
      |   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
      |     raise BaseExceptionGroup(
      | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
      +-+---------------- 1 ----------------
        | Traceback (most recent call last):
        |   File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 435, in run_asgi
        |     result = await app(  # type: ignore[func-returns-value]
        |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
        |     return await self.app(scope, receive, send)
        |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
        |     await super().__call__(scope, receive, send)
        |   File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
        |     await self.middleware_stack(scope, receive, send)
        |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
        |     raise exc
        |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
        |     await self.app(scope, receive, _send)
        |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 189, in __call__
        |     with collapse_excgroups():
        |   File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
        |     self.gen.throw(typ, value, traceback)
        |   File "/usr/local/lib/python3.11/site-packages/starlette/_utils.py", line 93, in collapse_excgroups
        |     raise exc
        |   File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 261, in wrap
        |     await func()
        |   File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in stream_response
        |     async for chunk in self.body_iterator:
        |   File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 65, in iterate_in_threadpool
        |     yield await anyio.to_thread.run_sync(_next, as_iterator)
        |           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 56, in run_sync
        |     return await get_async_backend().run_sync_in_worker_thread(
        |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2476, in run_sync_in_worker_thread
        |     return await future
        |            ^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 967, in run
        |     result = context.run(func, *args)
        |              ^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 54, in _next
        |     return next(iterator)
        |            ^^^^^^^^^^^^^^
        |   File "/app/main.py", line 705, in stream_content
        |     for line in res:
        |   File "/app/./pipelines/google_manifold_pipeline.py", line 196, in stream_response
        |     for chunk in response:
        |   File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 6615, in generate_content_stream
        |     for chunk in response:
        |   File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 5367, in _generate_content_stream
        |     for response in self._api_client.request_streamed(
        |   File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1308, in request_streamed
        |     session_response = self._request(http_request, http_options, stream=True)
        |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1127, in _request
        |     return self._retry(self._request_once, http_request, stream)  # type: ignore[no-any-return]
        |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 475, in __call__
        |     do = self.iter(retry_state=retry_state)
        |          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
        |     result = action(retry_state)
        |              ^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 418, in exc_check
        |     raise retry_exc.reraise()
        |           ^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 185, in reraise
        |     raise self.last_attempt.result()
        |           ^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
        |     return self.__get_result()
        |            ^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
        |     raise self._exception
        |   File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 478, in __call__
        |     result = fn(*args, **kwargs)
        |              ^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1091, in _request_once
        |     response = self._httpx_client.send(httpx_request, stream=stream)
        |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 901, in send
        |     raise RuntimeError("Cannot send a request, as the client has been closed.")
        | RuntimeError: Cannot send a request, as the client has been closed.
        +------------------------------------

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 435, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
    raise exc
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 189, in __call__
    with collapse_excgroups():
  File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.11/site-packages/starlette/_utils.py", line 93, in collapse_excgroups
    raise exc
  File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 261, in wrap
    await func()
  File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in stream_response
    async for chunk in self.body_iterator:
  File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 65, in iterate_in_threadpool
    yield await anyio.to_thread.run_sync(_next, as_iterator)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2476, in run_sync_in_worker_thread
    return await future
           ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 967, in run
    result = context.run(func, *args)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 54, in _next
    return next(iterator)
           ^^^^^^^^^^^^^^
  File "/app/main.py", line 705, in stream_content
    for line in res:
  File "/app/./pipelines/google_manifold_pipeline.py", line 196, in stream_response
    for chunk in response:
  File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 6615, in generate_content_stream
    for chunk in response:
  File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 5367, in _generate_content_stream
    for response in self._api_client.request_streamed(
  File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1308, in request_streamed
    session_response = self._request(http_request, http_options, stream=True)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1127, in _request
    return self._retry(self._request_once, http_request, stream)  # type: ignore[no-any-return]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 475, in __call__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 418, in exc_check
    raise retry_exc.reraise()
          ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 185, in reraise
    raise self.last_attempt.result()
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 478, in __call__
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1091, in _request_once
    response = self._httpx_client.send(httpx_request, stream=stream)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 901, in send
    raise RuntimeError("Cannot send a request, as the client has been closed.")
RuntimeError: Cannot send a request, as the client has been closed.
INFO:root:stream:true:<generator object Pipeline.stream_response at 0x7f77d0f350>
INFO:google_genai.models:AFC is enabled with max remote calls: 10.
ERROR:    Exception in ASGI application
  + Exception Group Traceback (most recent call last):
  |   File "/usr/local/lib/python3.11/site-packages/starlette/_utils.py", line 87, in collapse_excgroups
  |     yield
  |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 190, in __call__
  |     async with anyio.create_task_group() as task_group:
  |   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
  |     raise BaseExceptionGroup(
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Exception Group Traceback (most recent call last):
    |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 192, in __call__
    |     await response(scope, wrapped_receive, send)
    |   File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 258, in __call__
    |     async with anyio.create_task_group() as task_group:
    |   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
    |     raise BaseExceptionGroup(
    | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
    +-+---------------- 1 ----------------
      | Exception Group Traceback (most recent call last):
      |   File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 261, in wrap
      |     await func()
      |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 217, in stream_response
      |     return await super().stream_response(send)
      |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in stream_response
      |     async for chunk in self.body_iterator:
      |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 181, in body_stream
      |     raise app_exc
      |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 151, in coro
      |     await self.app(scope, receive_or_disconnect, send_no_error)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 85, in __call__
      |     await self.app(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 65, in __call__
      |     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
      |     raise exc
      |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
      |     await app(scope, receive, sender)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 756, in __call__
      |     await self.middleware_stack(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 776, in app
      |     await route.handle(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 297, in handle
      |     await self.app(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 77, in app
      |     await wrap_app_handling_exceptions(app, request)(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
      |     raise exc
      |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
      |     await app(scope, receive, sender)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 75, in app
      |     await response(scope, receive, send)
      |   File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 258, in __call__
      |     async with anyio.create_task_group() as task_group:
      |   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
      |     raise BaseExceptionGroup(
      | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
      +-+---------------- 1 ----------------
        | Traceback (most recent call last):
        |   File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 435, in run_asgi
        |     result = await app(  # type: ignore[func-returns-value]
        |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
        |     return await self.app(scope, receive, send)
        |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
        |     await super().__call__(scope, receive, send)
        |   File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
        |     await self.middleware_stack(scope, receive, send)
        |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
        |     raise exc
        |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
        |     await self.app(scope, receive, _send)
        |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 189, in __call__
        |     with collapse_excgroups():
        |   File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
        |     self.gen.throw(typ, value, traceback)
        |   File "/usr/local/lib/python3.11/site-packages/starlette/_utils.py", line 93, in collapse_excgroups
        |     raise exc
        |   File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 261, in wrap
        |     await func()
        |   File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in stream_response
        |     async for chunk in self.body_iterator:
        |   File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 65, in iterate_in_threadpool
        |     yield await anyio.to_thread.run_sync(_next, as_iterator)
        |           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 56, in run_sync
        |     return await get_async_backend().run_sync_in_worker_thread(
        |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2476, in run_sync_in_worker_thread
        |     return await future
        |            ^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 967, in run
        |     result = context.run(func, *args)
        |              ^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 54, in _next
        |     return next(iterator)
        |            ^^^^^^^^^^^^^^
        |   File "/app/main.py", line 705, in stream_content
        |     for line in res:
        |   File "/app/./pipelines/google_manifold_pipeline.py", line 196, in stream_response
        |     for chunk in response:
        |   File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 6615, in generate_content_stream
        |     for chunk in response:
        |   File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 5367, in _generate_content_stream
        |     for response in self._api_client.request_streamed(
        |   File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1308, in request_streamed
        |     session_response = self._request(http_request, http_options, stream=True)
        |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1127, in _request
        |     return self._retry(self._request_once, http_request, stream)  # type: ignore[no-any-return]
        |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 475, in __call__
        |     do = self.iter(retry_state=retry_state)
        |          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
        |     result = action(retry_state)
        |              ^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 418, in exc_check
        |     raise retry_exc.reraise()
        |           ^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 185, in reraise
        |     raise self.last_attempt.result()
        |           ^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
        |     return self.__get_result()
        |            ^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
        |     raise self._exception
        |   File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 478, in __call__
        |     result = fn(*args, **kwargs)
        |              ^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1091, in _request_once
        |     response = self._httpx_client.send(httpx_request, stream=stream)
        |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        |   File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 901, in send
        |     raise RuntimeError("Cannot send a request, as the client has been closed.")
        | RuntimeError: Cannot send a request, as the client has been closed.
        +------------------------------------

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 435, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
    raise exc
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/base.py", line 189, in __call__
    with collapse_excgroups():
  File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.11/site-packages/starlette/_utils.py", line 93, in collapse_excgroups
    raise exc
  File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 261, in wrap
    await func()
  File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in stream_response
    async for chunk in self.body_iterator:
  File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 65, in iterate_in_threadpool
    yield await anyio.to_thread.run_sync(_next, as_iterator)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2476, in run_sync_in_worker_thread
    return await future
           ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 967, in run
    result = context.run(func, *args)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/starlette/concurrency.py", line 54, in _next
    return next(iterator)
           ^^^^^^^^^^^^^^
  File "/app/main.py", line 705, in stream_content
    for line in res:
  File "/app/./pipelines/google_manifold_pipeline.py", line 196, in stream_response
    for chunk in response:
  File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 6615, in generate_content_stream
    for chunk in response:
  File "/usr/local/lib/python3.11/site-packages/google/genai/models.py", line 5367, in _generate_content_stream
    for response in self._api_client.request_streamed(
  File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1308, in request_streamed
    session_response = self._request(http_request, http_options, stream=True)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1127, in _request
    return self._retry(self._request_once, http_request, stream)  # type: ignore[no-any-return]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 475, in __call__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 418, in exc_check
    raise retry_exc.reraise()
          ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 185, in reraise
    raise self.last_attempt.result()
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 478, in __call__
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/genai/_api_client.py", line 1091, in _request_once
    response = self._httpx_client.send(httpx_request, stream=stream)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 901, in send
    raise RuntimeError("Cannot send a request, as the client has been closed.")
RuntimeError: Cannot send a request, as the client has been closed.

ndrew222 avatar Sep 30 '25 22:09 ndrew222

I had the same problem.

I just gave up on the pipeline and instead started using the OpenAI endpoint for Gemini in OpenWebUI connections panel. It works perfect now.

Also make sure to check your Google account between AI studio and cloud console as it's a true clusterf***... My API key, which is part of a project, had no billing setup which has to be configured in cloud console while there was also a discrepancy between the existing projects on Google AI studio and Google cloud console. In all honesty, I barely had used Gemini before.

Here is my recommended order to migrate to the working OpenAI endpoint for Gemini:

  1. Create a project in cloud console
  2. Setup billing for this cloud console project
  3. In AI studio, USE THE IMPORT PROJECT option to get the project which was created in cloud console
  4. Create an API key in AI studio
  5. Delete the pipeline in OpenWebUI
  6. In OpenWebUI create a connection to https://generativelanguage.googleapis.com/v1beta/openai with your key
  7. Check if your model list has been populated with Googles stuff and test it
  • Also check google status as they seem to have had some issues around the time of posting.

SHOzturk-PV avatar Oct 01 '25 09:10 SHOzturk-PV

I have the same issue.

Hydoxl avatar Oct 09 '25 21:10 Hydoxl

Got the same problem using official Open WebUI pipeline. Gave up and started using official Gemini API with OpenAI compatibility.

febryanvaldo avatar Oct 23 '25 03:10 febryanvaldo