redis-py icon indicating copy to clipboard operation
redis-py copied to clipboard

Error when reusing asyncio connection pool - multiple event loops

Open ohnoah opened this issue 1 year ago • 3 comments

I'm using django and the asyncio Redis client. I want to share a connection pool across my requests. Thus, I initialize the connection pool centrally in my Django settings. I'm using Redis as a distributed semaphore for one of my routes. However, when I have two concurrent requests running, I get an error about got Future <Future pending> attached to a different loop. THis does not happen if I initialize a Redis client for each call to this semaphore. It seems to be some issue with reusing the connection pool across event loops. How would I go about doing this? I don't want to create one connection per call, since this is a distributed semaphore that gets called a lot.

Version: What redis-py and what redis version is the issue happening on? 5.0.6

Platform: What platform / version? (For example Python 3.5.1 on Windows 7 / Ubuntu 15.10 / Azure) Mac OS. Python 3.11 Description: Description of your issue, stack traces from errors and code that reproduces the issue

Traceback (most recent call last):
...
  File "/Users//answer-grid//endpoints_app/answers/redis_semaphore.py", line 123, in __aexit__
    await self.semaphore.release(self.identifier)
  File "/Users//answer-grid//endpoints_app/answers/redis_semaphore.py", line 102, in release
    result = await pipe.execute()
             ^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 1528, in execute
    return await conn.retry.call_with_retry(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/retry.py", line 59, in call_with_retry
    return await do()
           ^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 1371, in _execute_transaction
    await self.parse_response(connection, "_")
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 1464, in parse_response
    result = await super().parse_response(connection, command_name, **options)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 633, in parse_response
    response = await connection.read_response()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/connection.py", line 541, in read_response
    response = await self._parser.read_response(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 82, in read_response
    response = await self._read_response(disable_decoding=disable_decoding)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 90, in _read_response
    raw = await self._readline()
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/_parsers/base.py", line 219, in _readline
    data = await self._stream.readline()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/streams.py", line 568, in readline
    line = await self.readuntil(sep)
 44 multidict==6.0.5
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/streams.py", line 660, in readuntil
    await self._wait_for_data('readuntil')
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/streams.py", line 545, in _wait_for_data
    await self._waiter
RuntimeError: Task <Task pending name='Task-115' coro=<GeneralWebPageFetcher.async_get_pages.<locals>.get_page_with_semaphore() running at /Users/////answers/myfile.py:313> cb=[gather.<locals>._done_callback() at /opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/tasks.py:767]> got Future <Future pending> attached to a different loop

ohnoah avatar Aug 09 '24 03:08 ohnoah

import os
import uuid
import asyncio
import time
from typing import Any
import random
from django.conf import settings
from redis import asyncio as aioredis

STARTING_BACKOFF_S = 4
MAX_BACKOFF_S = 16


class SemaphoreTimeoutError(Exception):
    """Exception raised when a semaphore acquisition times out."""

    def __init__(self, message: str) -> None:
        super().__init__(message)


class RedisSemaphore:
    def __init__(
        self,
        key: str,
        max_locks: int,
        timeout: int = 30,
        wait_timeout: int = 30,
    ) -> None:
        """
        Initialize the RedisSemaphore.

        :param redis_url: URL of the Redis server.
        :param key: Redis key for the semaphore.
        :param max_locks: Maximum number of concurrent locks.
        :param timeout: How long until the lock should automatically be timed out in seconds.
        :param wait_timeout: How long to wait before aborting attempting to acquire a lock.
        """
        self.redis_url = os.environ["REDIS_URL"]
        self.key = key
        self.max_locks = max_locks
        self.timeout = timeout
        self.wait_timeout = wait_timeout
        self.redis = aioredis.Redis(connection_pool=settings.REDIS_POOL)
        self.identifier = "Only identifier"

    async def acquire(self) -> str:
        """
        Acquire a lock from the semaphore.

        :raises SemaphoreTimeoutError: If the semaphore acquisition times out.
        :return: The identifier for the acquired semaphore.
        """
        czset = f"{self.key}:owner"
        ctr = f"{self.key}:counter"
        identifier = str(uuid.uuid4())
        now = time.time()
        start_time = now
        backoff = STARTING_BACKOFF_S

        while True:
            # TODO: Redundant?
            if time.time() - start_time > self.wait_timeout:
                raise SemaphoreTimeoutError("Waited too long to acquire the semaphore.")

            async with self.redis.pipeline(transaction=True) as pipe:
                pipe.zremrangebyscore(self.key, "-inf", now - self.timeout)
                pipe.zinterstore(czset, {czset: 1, self.key: 0})
                pipe.incr(ctr)
                counter = (await pipe.execute())[-1]

                pipe.zadd(self.key, {identifier: now})
                pipe.zadd(czset, {identifier: counter})
                pipe.zrank(czset, identifier)
                rank = (await pipe.execute())[-1]

                print(rank)
                if rank < self.max_locks:
                    return identifier

                pipe.zrem(self.key, identifier)
                pipe.zrem(czset, identifier)
                await pipe.execute()

            # Exponential backoff with randomness
            sleep_time = backoff * (1 + random.random() * 0.3)
            if (sleep_time + time.time() - start_time) > self.wait_timeout:
                raise SemaphoreTimeoutError("Waited too long to acquire the semaphore.")
            await asyncio.sleep(sleep_time)
            backoff = min(backoff * 2, MAX_BACKOFF_S)

    async def release(self, identifier: str) -> bool:
        """
        Release a lock from the semaphore.

        :param identifier: The identifier for the lock to be released.
        :return: True if the semaphore was properly released, False if it had timed out.
        """
        czset = f"{self.key}:owner"
        async with self.redis.pipeline(transaction=True) as pipe:
            pipe.zrem(self.key, identifier)
            pipe.zrem(czset, identifier)
            result = await pipe.execute()
        return result[0] > 0


class RedisSemaphoreContext:
    def __init__(self, semaphore: RedisSemaphore) -> None:
        """
        Initialize the RedisSemaphoreContext.

        :param semaphore: An instance of RedisSemaphore.
        """
        self.semaphore = semaphore
        self.identifier = None

    async def __aenter__(self) -> "RedisSemaphoreContext":
        """Enter the async context manager."""
        self.identifier = await self.semaphore.acquire()
        return self

    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Exit the async context manager."""
        await self.semaphore.release(self.identifier)

This is my redis semaphore class

ohnoah avatar Aug 09 '24 03:08 ohnoah

This looks related to Django potentially and running locally. Let me investigate further

ohnoah avatar Aug 10 '24 16:08 ohnoah

I am encountering something very similar with a FastAPI setup and tests using pytest-asyncio.

Here is minimal fastapi app that exhibits this behavior:

myapp.main.py:

from fastapi import FastAPI, Depends

import redis.asyncio
app = FastAPI()

redis_con = redis.asyncio.Redis()

@app.get("/items/")
async def read_items():
    value = await redis_con.get("foo")
    return {"value": value}

tests.py:

import fastapi.testclient
import pytest

from myapp.main import app



@pytest.fixture()
def test_client() -> fastapi.testclient.TestClient:
    test_client = fastapi.testclient.TestClient(app)
    return test_client

async def test_read_items(test_client):
    response = test_client.get("/items/")
    assert response.status_code == 200
    response = test_client.get("/items/") # <--- this is where the error occurs
    assert response.status_code == 200

When the endpoint is the queried for the second time, the following error occurs

(redistest) sander@Sanders-MacBook-Pro redistest % py.test tests/test.py
====================================================================== test session starts =======================================================================
platform darwin -- Python 3.13.0, pytest-8.3.5, pluggy-1.5.0
rootdir: /Users/sander/PycharmProjects/redistest
configfile: pyproject.toml
plugins: anyio-4.9.0, asyncio-0.25.3
asyncio: mode=Mode.AUTO, asyncio_default_fixture_loop_scope=function
collected 1 item                                                                                                                                                 

tests/test.py F                                                                                                                                            [100%]

============================================================================ FAILURES ============================================================================
________________________________________________________________________ test_read_items _________________________________________________________________________

self = <redis.asyncio.connection.Connection(host=localhost,port=6379,db=0)>, disable_decoding = False, timeout = None

    async def read_response(
        self,
        disable_decoding: bool = False,
        timeout: Optional[float] = None,
        *,
        disconnect_on_error: bool = True,
        push_request: Optional[bool] = False,
    ):
        """Read the response from a previously sent command"""
        read_timeout = timeout if timeout is not None else self.socket_timeout
        host_error = self._host_error()
        try:
            if (
                read_timeout is not None
                and self.protocol in ["3", 3]
                and not HIREDIS_AVAILABLE
            ):
                async with async_timeout(read_timeout):
                    response = await self._parser.read_response(
                        disable_decoding=disable_decoding, push_request=push_request
                    )
            elif read_timeout is not None:
                async with async_timeout(read_timeout):
                    response = await self._parser.read_response(
                        disable_decoding=disable_decoding
                    )
            elif self.protocol in ["3", 3] and not HIREDIS_AVAILABLE:
                response = await self._parser.read_response(
                    disable_decoding=disable_decoding, push_request=push_request
                )
            else:
>               response = await self._parser.read_response(
                    disable_decoding=disable_decoding
                )

.venv/lib/python3.13/site-packages/redis/asyncio/connection.py:549: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.13/site-packages/redis/_parsers/resp2.py:82: in read_response
    response = await self._read_response(disable_decoding=disable_decoding)
.venv/lib/python3.13/site-packages/redis/_parsers/resp2.py:90: in _read_response
    raw = await self._readline()
.venv/lib/python3.13/site-packages/redis/_parsers/base.py:219: in _readline
    data = await self._stream.readline()
../../.pyenv/versions/3.13.0/lib/python3.13/asyncio/streams.py:562: in readline
    line = await self.readuntil(sep)
../../.pyenv/versions/3.13.0/lib/python3.13/asyncio/streams.py:677: in readuntil
    await self._wait_for_data('readuntil')
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <StreamReader transport=<_SelectorSocketTransport closing fd=19>>, func_name = 'readuntil'

    async def _wait_for_data(self, func_name):
        """Wait until feed_data() or feed_eof() is called.
    
        If stream was paused, automatically resume it.
        """
        # StreamReader uses a future to link the protocol feed_data() method
        # to a read coroutine. Running two read coroutines at the same time
        # would have an unexpected behaviour. It would not possible to know
        # which coroutine would get the next data.
        if self._waiter is not None:
            raise RuntimeError(
                f'{func_name}() called while another coroutine is '
                f'already waiting for incoming data')
    
        assert not self._eof, '_wait_for_data after EOF'
    
        # Waiting for data while paused will make deadlock, so prevent it.
        # This is essential for readexactly(n) for case when n > self._limit.
        if self._paused:
            self._paused = False
            self._transport.resume_reading()
    
        self._waiter = self._loop.create_future()
        try:
>           await self._waiter
E           RuntimeError: Task <Task pending name='anyio.from_thread.BlockingPortal._call_func' coro=<BlockingPortal._call_func() running at /Users/sander/PycharmProjects/redistest/.venv/lib/python3.13/site-packages/anyio/from_thread.py:221> cb=[TaskGroup._spawn.<locals>.task_done() at /Users/sander/PycharmProjects/redistest/.venv/lib/python3.13/site-packages/anyio/_backends/_asyncio.py:794]> got Future <Future pending> attached to a different loop

../../.pyenv/versions/3.13.0/lib/python3.13/asyncio/streams.py:539: RuntimeError

During handling of the above exception, another exception occurred:

test_client = <starlette.testclient.TestClient object at 0x107cd57f0>

    async def test_read_items(test_client):
        response = test_client.get("/items/")
        assert response.status_code == 200
>       response = test_client.get("/items/")

tests/test.py:16: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.13/site-packages/starlette/testclient.py:465: in get
    return super().get(
.venv/lib/python3.13/site-packages/httpx/_client.py:1053: in get
    return self.request(
.venv/lib/python3.13/site-packages/starlette/testclient.py:437: in request
    return super().request(
.venv/lib/python3.13/site-packages/httpx/_client.py:825: in request
    return self.send(request, auth=auth, follow_redirects=follow_redirects)
.venv/lib/python3.13/site-packages/httpx/_client.py:914: in send
    response = self._send_handling_auth(
.venv/lib/python3.13/site-packages/httpx/_client.py:942: in _send_handling_auth
    response = self._send_handling_redirects(
.venv/lib/python3.13/site-packages/httpx/_client.py:979: in _send_handling_redirects
    response = self._send_single_request(request)
.venv/lib/python3.13/site-packages/httpx/_client.py:1014: in _send_single_request
    response = transport.handle_request(request)
.venv/lib/python3.13/site-packages/starlette/testclient.py:340: in handle_request
    raise exc
.venv/lib/python3.13/site-packages/starlette/testclient.py:337: in handle_request
    portal.call(self.app, scope, receive, send)
.venv/lib/python3.13/site-packages/anyio/from_thread.py:290: in call
    return cast(T_Retval, self.start_task_soon(func, *args).result())
../../.pyenv/versions/3.13.0/lib/python3.13/concurrent/futures/_base.py:456: in result
    return self.__get_result()
../../.pyenv/versions/3.13.0/lib/python3.13/concurrent/futures/_base.py:401: in __get_result
    raise self._exception
.venv/lib/python3.13/site-packages/anyio/from_thread.py:221: in _call_func
    retval = await retval_or_awaitable
.venv/lib/python3.13/site-packages/fastapi/applications.py:1054: in __call__
    await super().__call__(scope, receive, send)
.venv/lib/python3.13/site-packages/starlette/applications.py:112: in __call__
    await self.middleware_stack(scope, receive, send)
.venv/lib/python3.13/site-packages/starlette/middleware/errors.py:187: in __call__
    raise exc
.venv/lib/python3.13/site-packages/starlette/middleware/errors.py:165: in __call__
    await self.app(scope, receive, _send)
.venv/lib/python3.13/site-packages/starlette/middleware/exceptions.py:62: in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
.venv/lib/python3.13/site-packages/starlette/_exception_handler.py:53: in wrapped_app
    raise exc
.venv/lib/python3.13/site-packages/starlette/_exception_handler.py:42: in wrapped_app
    await app(scope, receive, sender)
.venv/lib/python3.13/site-packages/starlette/routing.py:714: in __call__
    await self.middleware_stack(scope, receive, send)
.venv/lib/python3.13/site-packages/starlette/routing.py:734: in app
    await route.handle(scope, receive, send)
.venv/lib/python3.13/site-packages/starlette/routing.py:288: in handle
    await self.app(scope, receive, send)
.venv/lib/python3.13/site-packages/starlette/routing.py:76: in app
    await wrap_app_handling_exceptions(app, request)(scope, receive, send)
.venv/lib/python3.13/site-packages/starlette/_exception_handler.py:53: in wrapped_app
    raise exc
.venv/lib/python3.13/site-packages/starlette/_exception_handler.py:42: in wrapped_app
    await app(scope, receive, sender)
.venv/lib/python3.13/site-packages/starlette/routing.py:73: in app
    response = await f(request)
.venv/lib/python3.13/site-packages/fastapi/routing.py:301: in app
    raw_response = await run_endpoint_function(
.venv/lib/python3.13/site-packages/fastapi/routing.py:212: in run_endpoint_function
    return await dependant.call(**values)
myapp/main.py:10: in read_items
    await redis_con.set("foo", "bar", ex=10)
.venv/lib/python3.13/site-packages/redis/asyncio/client.py:616: in execute_command
    return await conn.retry.call_with_retry(
.venv/lib/python3.13/site-packages/redis/asyncio/retry.py:59: in call_with_retry
    return await do()
.venv/lib/python3.13/site-packages/redis/asyncio/client.py:590: in _send_command_parse_response
    return await self.parse_response(conn, command_name, **options)
.venv/lib/python3.13/site-packages/redis/asyncio/client.py:637: in parse_response
    response = await connection.read_response()
.venv/lib/python3.13/site-packages/redis/asyncio/connection.py:569: in read_response
    await self.disconnect(nowait=True)
.venv/lib/python3.13/site-packages/redis/asyncio/connection.py:425: in disconnect
    self._writer.close()  # type: ignore[union-attr]
../../.pyenv/versions/3.13.0/lib/python3.13/asyncio/streams.py:352: in close
    return self._transport.close()
../../.pyenv/versions/3.13.0/lib/python3.13/asyncio/selector_events.py:1202: in close
    super().close()
../../.pyenv/versions/3.13.0/lib/python3.13/asyncio/selector_events.py:865: in close
    self._loop.call_soon(self._call_connection_lost, None)
../../.pyenv/versions/3.13.0/lib/python3.13/asyncio/base_events.py:829: in call_soon
    self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_UnixSelectorEventLoop running=False closed=True debug=False>

    def _check_closed(self):
        if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

../../.pyenv/versions/3.13.0/lib/python3.13/asyncio/base_events.py:552: RuntimeError
==================================================================== short test summary info =====================================================================
FAILED tests/test.py::test_read_items - RuntimeError: Event loop is closed

Using the following, using a FastAPI dependency to generate a new Redis instance for each request, I don't get an error in the test.

async def get_redis() -> AsyncGenerator[redis.asyncio.Redis,  None]:
    redis_con = redis.asyncio.Redis()
    try:
        yield redis_con
    finally:
        await redis_con.aclose()
@app.get("/items/")
async def read_items(redis_con: redis.asyncio.Redis = Depends(get_redis)):
    value = await redis_con.get("foo")
    return {"value": value}


That said, I only get this behaviour while testing. Doing multiple queries to the same endpoint using the long-lived Redis instance works fine with plain cURL.

Now, my actual application that exhibits this behaviour puts manages the Redis instance using a lifespan, but the end result for tests is the same as the simple case described above.

At this stage I'm not sure whether this is an issue in redis-py, fastapi or pytest-asyncio.

FWIW, here's my pyproject.toml associated with the minimal example:

[project]
name = "redistest"
version = "0.1.0"
description = "Add your description here"
requires-python = ">=3.13"
dependencies = [
    "fastapi[standard]>=0.115.11",
    "httpx>=0.28.1",
    "pytest>=8.3.5",
    "pytest-asyncio>=0.25.3",
    "redis>=5.2.1",
]

[build-system]
requires = ['setuptools']
build-backend = 'setuptools.build_meta'

[tool.pytest.ini_options]
asyncio_default_fixture_loop_scope = "function"
asyncio_mode = "auto"

sanderbollen-clockworks avatar Mar 21 '25 15:03 sanderbollen-clockworks