vcrpy icon indicating copy to clipboard operation
vcrpy copied to clipboard

Weird Behaviour when using OpenAI with Aiohttp

Open baniasbaabe opened this issue 3 months ago • 4 comments

Hi guys,

I noticed something very weird when using vcrpy together with Async OpenAI and Aiohttp (instead of the default httpx).

import json
import logging
import typing as ty
from pathlib import Path

import tiktoken
from aiohttp import ClientSession
from httpx_aiohttp import AiohttpTransport
from langfuse.decorators import observe
from langfuse.openai import AsyncAzureOpenAI
from openai import BadRequestError, DefaultAsyncHttpxClient, RateLimitError
from openai.types.audio import TranscriptionVerbose
from openai.types.chat import ChatCompletionMessageParam
from pydantic import BaseModel
from tenacity import (
    RetryCallState,
    retry,
    retry_if_exception_type,
    stop_after_attempt,
    wait_chain,
    wait_fixed,
)

from .base import AsyncClient

class OpenAIManager(AsyncClient):

    def __init__(self, settings: AppSettings, configs: AppConfig):
        self.configs = configs
        self.settings = settings
        aiohttp_transport = AiohttpTransport(client=ClientSession())
        httpx_client = DefaultAsyncHttpxClient(transport=aiohttp_transport)

        self.openai_client = AsyncAzureOpenAI(
            azure_endpoint=settings.AZURE_OPENAI_ENDPOINT,
            api_version=settings.AZURE_OPENAI_API_VERSION,
            api_key=settings.AZURE_OPENAI_API_KEY,
            timeout=300,
            http_client=httpx_client,
        )

Now, when I use OpenAI with AiohttpTransport, my records contain multiple calls to OpenAI (while just one should be there). And also, my tests fail.

When I comment out the http_client parameter, it works fine (since OpenAI uses plain HTTPX by default)

My config:

@pytest.fixture(scope="module")
def vcr_config():
    cassette_dir = Path(__file__).parent / "cassettes"
    cassette_dir.mkdir(exist_ok=True)

    return vcr.VCR(
        cassette_library_dir=str(cassette_dir),
        filter_headers=["authorization", "api-key"],
        ignore_hosts=[
            "localhost",
            "unix",
            "docker",
            "langfuse.genai-netz-nele-dev.enbw-az.cloud",
        ],
        ignore_localhost=True,
        record_mode="new_episodes",
        match_on=["uri", "method", "body"],
        decode_compressed_response=True,
        record_on_exception=False
    )

@pytest.mark.asyncio
async def test_vector_search_knowledge_file_success(test_client_tmr_user, vcr_config):
    with vcr_config.use_cassette("test_vector_search_knowledge_file_success.yaml", allow_playback_repeats=True):
        response = await test_client_tmr_user.get(
            "/vector_search",
            params={
                ...
            },
        )
        
        assert response.status_code == 200
        data = response.json()
        assert isinstance(data, list)
        assert len(data) <= 5 

baniasbaabe avatar Sep 08 '25 12:09 baniasbaabe

I was hitting this too yesterday when adopting https://github.com/karpetrosyan/httpx-aiohttp. Basically there's several issues happening concurrently here:

  • https://github.com/kevin1024/vcrpy/issues/944
  • https://github.com/karpetrosyan/httpx-aiohttp/issues/23

You can fix via this in a pytest conftest.py:

from collections.abc import AsyncIterator

import httpx_aiohttp
import litellm.llms.custom_httpx.aiohttp_transport
import vcr.stubs.httpx_stubs


class PreReadCompatibleAiohttpResponseStream(
    httpx_aiohttp.transport.AiohttpResponseStream
):
    """aiohttp-backed response stream that works if the response was pre-read."""

    async def __aiter__(self) -> AsyncIterator[bytes]:
        with httpx_aiohttp.transport.map_aiohttp_exceptions():
            if self._aiohttp_response._body is not None:
                # Happens if some intermediary called `await _aiohttp_response.read()`
                # TODO: take into account chunk size
                yield self._aiohttp_response._body
            else:
                async for chunk in self._aiohttp_response.content.iter_chunked(
                    self.CHUNK_SIZE
                ):
                    yield chunk


async def _async_vcr_send(cassette, real_send, *args, **kwargs):  # noqa: ARG001
    """VCR send that only sends, not possibly recording or playing back responses."""
    return await real_send(*args, **kwargs)


# Permanently patch the original response stream,
# to work around https://github.com/karpetrosyan/httpx-aiohttp/issues/23
# and https://github.com/BerriAI/litellm/issues/11724
httpx_aiohttp.transport.AiohttpResponseStream = (  # type: ignore[misc]
    litellm.llms.custom_httpx.aiohttp_transport.AiohttpResponseStream  # type: ignore[misc]
) = PreReadCompatibleAiohttpResponseStream  # type: ignore[assignment]

# Permanently patch vcrpy's async VCR recording functionality,
# to work around https://github.com/kevin1024/vcrpy/issues/944
vcr.stubs.httpx_stubs._async_vcr_send = _async_vcr_send

jamesbraza avatar Sep 23 '25 23:09 jamesbraza

@jamesbraza the suggested fix works! Thanks!

Adding some context from my own tracing. My set up uses litellm.aembedding() (and other place the sync embedding method). When the HTTP call is made, since vcrpy patches both httpx and the underlying aiohttp library, 2 cassettes are captured by vcrpy: one from the aiohttp_stubs.py and another from httpx_stubs.py. The aiohttp capture includes the real payload and the httpx_stub captures an empty cassette.

Since the aiohttp_stubs already reads the request body and saved it to a cassette, when the httpx_stub functions get called again, it see an empty _content. This can be surfaced when using the sync litellm.embedding() call: you see see an assertion error from:

async def _to_serialized_response(resp, aread):
    # The content shouldn't already have been read in by HTTPX.
    assert not hasattr(resp, "_decoder")

When using litellm.aembedding(), it manifests as:

  1. A single capture has 2 identical-looking requests: one with body and another with empty body
  2. The first call fails with a JSON serialization error from the openai SDK (since the body it received from the httpx stub is empty). On subsequent calls succeed because the cassette is found and the aiohttp replay takes priority and replays the originally captured body.

frankgu968 avatar Oct 08 '25 03:10 frankgu968

The httpx implementation was completely rewritten in v8.0.0 (PR #943) - it now patches httpcore instead of httpx directly. The httpx_stubs.py file referenced in the workarounds no longer exists. Could you please test with v8.0.0 and let us know if the double-recording issue is resolved?

kevin1024 avatar Dec 08 '25 13:12 kevin1024

This issue is not resolved yet. My issue at #944 is a possible resolution of one aspect of this issue

jamesbraza avatar Dec 08 '25 17:12 jamesbraza