Error when reusing asyncio connection pool - multiple event loops
I'm using django and the asyncio Redis client. I want to share a connection pool across my requests. Thus, I initialize the connection pool centrally in my Django settings. I'm using Redis as a distributed semaphore for one of my routes. However, when I have two concurrent requests running, I get an error about got Future <Future pending> attached to a different loop. THis does not happen if I initialize a Redis client for each call to this semaphore. It seems to be some issue with reusing the connection pool across event loops. How would I go about doing this? I don't want to create one connection per call, since this is a distributed semaphore that gets called a lot.
Version: What redis-py and what redis version is the issue happening on? 5.0.6
Platform: What platform / version? (For example Python 3.5.1 on Windows 7 / Ubuntu 15.10 / Azure) Mac OS. Python 3.11 Description: Description of your issue, stack traces from errors and code that reproduces the issue
Traceback (most recent call last):
...
File "/Users//answer-grid//endpoints_app/answers/redis_semaphore.py", line 123, in __aexit__
await self.semaphore.release(self.identifier)
File "/Users//answer-grid//endpoints_app/answers/redis_semaphore.py", line 102, in release
result = await pipe.execute()
^^^^^^^^^^^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 1528, in execute
return await conn.retry.call_with_retry(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/retry.py", line 59, in call_with_retry
return await do()
^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 1371, in _execute_transaction
await self.parse_response(connection, "_")
File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 1464, in parse_response
result = await super().parse_response(connection, command_name, **options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 633, in parse_response
response = await connection.read_response()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/connection.py", line 541, in read_response
response = await self._parser.read_response(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 82, in read_response
response = await self._read_response(disable_decoding=disable_decoding)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 90, in _read_response
raw = await self._readline()
^^^^^^^^^^^^^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/_parsers/base.py", line 219, in _readline
data = await self._stream.readline()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/streams.py", line 568, in readline
line = await self.readuntil(sep)
44 multidict==6.0.5
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/streams.py", line 660, in readuntil
await self._wait_for_data('readuntil')
File "/opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/streams.py", line 545, in _wait_for_data
await self._waiter
RuntimeError: Task <Task pending name='Task-115' coro=<GeneralWebPageFetcher.async_get_pages.<locals>.get_page_with_semaphore() running at /Users/////answers/myfile.py:313> cb=[gather.<locals>._done_callback() at /opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/tasks.py:767]> got Future <Future pending> attached to a different loop
import os
import uuid
import asyncio
import time
from typing import Any
import random
from django.conf import settings
from redis import asyncio as aioredis
STARTING_BACKOFF_S = 4
MAX_BACKOFF_S = 16
class SemaphoreTimeoutError(Exception):
"""Exception raised when a semaphore acquisition times out."""
def __init__(self, message: str) -> None:
super().__init__(message)
class RedisSemaphore:
def __init__(
self,
key: str,
max_locks: int,
timeout: int = 30,
wait_timeout: int = 30,
) -> None:
"""
Initialize the RedisSemaphore.
:param redis_url: URL of the Redis server.
:param key: Redis key for the semaphore.
:param max_locks: Maximum number of concurrent locks.
:param timeout: How long until the lock should automatically be timed out in seconds.
:param wait_timeout: How long to wait before aborting attempting to acquire a lock.
"""
self.redis_url = os.environ["REDIS_URL"]
self.key = key
self.max_locks = max_locks
self.timeout = timeout
self.wait_timeout = wait_timeout
self.redis = aioredis.Redis(connection_pool=settings.REDIS_POOL)
self.identifier = "Only identifier"
async def acquire(self) -> str:
"""
Acquire a lock from the semaphore.
:raises SemaphoreTimeoutError: If the semaphore acquisition times out.
:return: The identifier for the acquired semaphore.
"""
czset = f"{self.key}:owner"
ctr = f"{self.key}:counter"
identifier = str(uuid.uuid4())
now = time.time()
start_time = now
backoff = STARTING_BACKOFF_S
while True:
# TODO: Redundant?
if time.time() - start_time > self.wait_timeout:
raise SemaphoreTimeoutError("Waited too long to acquire the semaphore.")
async with self.redis.pipeline(transaction=True) as pipe:
pipe.zremrangebyscore(self.key, "-inf", now - self.timeout)
pipe.zinterstore(czset, {czset: 1, self.key: 0})
pipe.incr(ctr)
counter = (await pipe.execute())[-1]
pipe.zadd(self.key, {identifier: now})
pipe.zadd(czset, {identifier: counter})
pipe.zrank(czset, identifier)
rank = (await pipe.execute())[-1]
print(rank)
if rank < self.max_locks:
return identifier
pipe.zrem(self.key, identifier)
pipe.zrem(czset, identifier)
await pipe.execute()
# Exponential backoff with randomness
sleep_time = backoff * (1 + random.random() * 0.3)
if (sleep_time + time.time() - start_time) > self.wait_timeout:
raise SemaphoreTimeoutError("Waited too long to acquire the semaphore.")
await asyncio.sleep(sleep_time)
backoff = min(backoff * 2, MAX_BACKOFF_S)
async def release(self, identifier: str) -> bool:
"""
Release a lock from the semaphore.
:param identifier: The identifier for the lock to be released.
:return: True if the semaphore was properly released, False if it had timed out.
"""
czset = f"{self.key}:owner"
async with self.redis.pipeline(transaction=True) as pipe:
pipe.zrem(self.key, identifier)
pipe.zrem(czset, identifier)
result = await pipe.execute()
return result[0] > 0
class RedisSemaphoreContext:
def __init__(self, semaphore: RedisSemaphore) -> None:
"""
Initialize the RedisSemaphoreContext.
:param semaphore: An instance of RedisSemaphore.
"""
self.semaphore = semaphore
self.identifier = None
async def __aenter__(self) -> "RedisSemaphoreContext":
"""Enter the async context manager."""
self.identifier = await self.semaphore.acquire()
return self
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Exit the async context manager."""
await self.semaphore.release(self.identifier)
This is my redis semaphore class
This looks related to Django potentially and running locally. Let me investigate further
I am encountering something very similar with a FastAPI setup and tests using pytest-asyncio.
Here is minimal fastapi app that exhibits this behavior:
myapp.main.py:
from fastapi import FastAPI, Depends
import redis.asyncio
app = FastAPI()
redis_con = redis.asyncio.Redis()
@app.get("/items/")
async def read_items():
value = await redis_con.get("foo")
return {"value": value}
tests.py:
import fastapi.testclient
import pytest
from myapp.main import app
@pytest.fixture()
def test_client() -> fastapi.testclient.TestClient:
test_client = fastapi.testclient.TestClient(app)
return test_client
async def test_read_items(test_client):
response = test_client.get("/items/")
assert response.status_code == 200
response = test_client.get("/items/") # <--- this is where the error occurs
assert response.status_code == 200
When the endpoint is the queried for the second time, the following error occurs
(redistest) sander@Sanders-MacBook-Pro redistest % py.test tests/test.py
====================================================================== test session starts =======================================================================
platform darwin -- Python 3.13.0, pytest-8.3.5, pluggy-1.5.0
rootdir: /Users/sander/PycharmProjects/redistest
configfile: pyproject.toml
plugins: anyio-4.9.0, asyncio-0.25.3
asyncio: mode=Mode.AUTO, asyncio_default_fixture_loop_scope=function
collected 1 item
tests/test.py F [100%]
============================================================================ FAILURES ============================================================================
________________________________________________________________________ test_read_items _________________________________________________________________________
self = <redis.asyncio.connection.Connection(host=localhost,port=6379,db=0)>, disable_decoding = False, timeout = None
async def read_response(
self,
disable_decoding: bool = False,
timeout: Optional[float] = None,
*,
disconnect_on_error: bool = True,
push_request: Optional[bool] = False,
):
"""Read the response from a previously sent command"""
read_timeout = timeout if timeout is not None else self.socket_timeout
host_error = self._host_error()
try:
if (
read_timeout is not None
and self.protocol in ["3", 3]
and not HIREDIS_AVAILABLE
):
async with async_timeout(read_timeout):
response = await self._parser.read_response(
disable_decoding=disable_decoding, push_request=push_request
)
elif read_timeout is not None:
async with async_timeout(read_timeout):
response = await self._parser.read_response(
disable_decoding=disable_decoding
)
elif self.protocol in ["3", 3] and not HIREDIS_AVAILABLE:
response = await self._parser.read_response(
disable_decoding=disable_decoding, push_request=push_request
)
else:
> response = await self._parser.read_response(
disable_decoding=disable_decoding
)
.venv/lib/python3.13/site-packages/redis/asyncio/connection.py:549:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.venv/lib/python3.13/site-packages/redis/_parsers/resp2.py:82: in read_response
response = await self._read_response(disable_decoding=disable_decoding)
.venv/lib/python3.13/site-packages/redis/_parsers/resp2.py:90: in _read_response
raw = await self._readline()
.venv/lib/python3.13/site-packages/redis/_parsers/base.py:219: in _readline
data = await self._stream.readline()
../../.pyenv/versions/3.13.0/lib/python3.13/asyncio/streams.py:562: in readline
line = await self.readuntil(sep)
../../.pyenv/versions/3.13.0/lib/python3.13/asyncio/streams.py:677: in readuntil
await self._wait_for_data('readuntil')
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <StreamReader transport=<_SelectorSocketTransport closing fd=19>>, func_name = 'readuntil'
async def _wait_for_data(self, func_name):
"""Wait until feed_data() or feed_eof() is called.
If stream was paused, automatically resume it.
"""
# StreamReader uses a future to link the protocol feed_data() method
# to a read coroutine. Running two read coroutines at the same time
# would have an unexpected behaviour. It would not possible to know
# which coroutine would get the next data.
if self._waiter is not None:
raise RuntimeError(
f'{func_name}() called while another coroutine is '
f'already waiting for incoming data')
assert not self._eof, '_wait_for_data after EOF'
# Waiting for data while paused will make deadlock, so prevent it.
# This is essential for readexactly(n) for case when n > self._limit.
if self._paused:
self._paused = False
self._transport.resume_reading()
self._waiter = self._loop.create_future()
try:
> await self._waiter
E RuntimeError: Task <Task pending name='anyio.from_thread.BlockingPortal._call_func' coro=<BlockingPortal._call_func() running at /Users/sander/PycharmProjects/redistest/.venv/lib/python3.13/site-packages/anyio/from_thread.py:221> cb=[TaskGroup._spawn.<locals>.task_done() at /Users/sander/PycharmProjects/redistest/.venv/lib/python3.13/site-packages/anyio/_backends/_asyncio.py:794]> got Future <Future pending> attached to a different loop
../../.pyenv/versions/3.13.0/lib/python3.13/asyncio/streams.py:539: RuntimeError
During handling of the above exception, another exception occurred:
test_client = <starlette.testclient.TestClient object at 0x107cd57f0>
async def test_read_items(test_client):
response = test_client.get("/items/")
assert response.status_code == 200
> response = test_client.get("/items/")
tests/test.py:16:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.venv/lib/python3.13/site-packages/starlette/testclient.py:465: in get
return super().get(
.venv/lib/python3.13/site-packages/httpx/_client.py:1053: in get
return self.request(
.venv/lib/python3.13/site-packages/starlette/testclient.py:437: in request
return super().request(
.venv/lib/python3.13/site-packages/httpx/_client.py:825: in request
return self.send(request, auth=auth, follow_redirects=follow_redirects)
.venv/lib/python3.13/site-packages/httpx/_client.py:914: in send
response = self._send_handling_auth(
.venv/lib/python3.13/site-packages/httpx/_client.py:942: in _send_handling_auth
response = self._send_handling_redirects(
.venv/lib/python3.13/site-packages/httpx/_client.py:979: in _send_handling_redirects
response = self._send_single_request(request)
.venv/lib/python3.13/site-packages/httpx/_client.py:1014: in _send_single_request
response = transport.handle_request(request)
.venv/lib/python3.13/site-packages/starlette/testclient.py:340: in handle_request
raise exc
.venv/lib/python3.13/site-packages/starlette/testclient.py:337: in handle_request
portal.call(self.app, scope, receive, send)
.venv/lib/python3.13/site-packages/anyio/from_thread.py:290: in call
return cast(T_Retval, self.start_task_soon(func, *args).result())
../../.pyenv/versions/3.13.0/lib/python3.13/concurrent/futures/_base.py:456: in result
return self.__get_result()
../../.pyenv/versions/3.13.0/lib/python3.13/concurrent/futures/_base.py:401: in __get_result
raise self._exception
.venv/lib/python3.13/site-packages/anyio/from_thread.py:221: in _call_func
retval = await retval_or_awaitable
.venv/lib/python3.13/site-packages/fastapi/applications.py:1054: in __call__
await super().__call__(scope, receive, send)
.venv/lib/python3.13/site-packages/starlette/applications.py:112: in __call__
await self.middleware_stack(scope, receive, send)
.venv/lib/python3.13/site-packages/starlette/middleware/errors.py:187: in __call__
raise exc
.venv/lib/python3.13/site-packages/starlette/middleware/errors.py:165: in __call__
await self.app(scope, receive, _send)
.venv/lib/python3.13/site-packages/starlette/middleware/exceptions.py:62: in __call__
await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
.venv/lib/python3.13/site-packages/starlette/_exception_handler.py:53: in wrapped_app
raise exc
.venv/lib/python3.13/site-packages/starlette/_exception_handler.py:42: in wrapped_app
await app(scope, receive, sender)
.venv/lib/python3.13/site-packages/starlette/routing.py:714: in __call__
await self.middleware_stack(scope, receive, send)
.venv/lib/python3.13/site-packages/starlette/routing.py:734: in app
await route.handle(scope, receive, send)
.venv/lib/python3.13/site-packages/starlette/routing.py:288: in handle
await self.app(scope, receive, send)
.venv/lib/python3.13/site-packages/starlette/routing.py:76: in app
await wrap_app_handling_exceptions(app, request)(scope, receive, send)
.venv/lib/python3.13/site-packages/starlette/_exception_handler.py:53: in wrapped_app
raise exc
.venv/lib/python3.13/site-packages/starlette/_exception_handler.py:42: in wrapped_app
await app(scope, receive, sender)
.venv/lib/python3.13/site-packages/starlette/routing.py:73: in app
response = await f(request)
.venv/lib/python3.13/site-packages/fastapi/routing.py:301: in app
raw_response = await run_endpoint_function(
.venv/lib/python3.13/site-packages/fastapi/routing.py:212: in run_endpoint_function
return await dependant.call(**values)
myapp/main.py:10: in read_items
await redis_con.set("foo", "bar", ex=10)
.venv/lib/python3.13/site-packages/redis/asyncio/client.py:616: in execute_command
return await conn.retry.call_with_retry(
.venv/lib/python3.13/site-packages/redis/asyncio/retry.py:59: in call_with_retry
return await do()
.venv/lib/python3.13/site-packages/redis/asyncio/client.py:590: in _send_command_parse_response
return await self.parse_response(conn, command_name, **options)
.venv/lib/python3.13/site-packages/redis/asyncio/client.py:637: in parse_response
response = await connection.read_response()
.venv/lib/python3.13/site-packages/redis/asyncio/connection.py:569: in read_response
await self.disconnect(nowait=True)
.venv/lib/python3.13/site-packages/redis/asyncio/connection.py:425: in disconnect
self._writer.close() # type: ignore[union-attr]
../../.pyenv/versions/3.13.0/lib/python3.13/asyncio/streams.py:352: in close
return self._transport.close()
../../.pyenv/versions/3.13.0/lib/python3.13/asyncio/selector_events.py:1202: in close
super().close()
../../.pyenv/versions/3.13.0/lib/python3.13/asyncio/selector_events.py:865: in close
self._loop.call_soon(self._call_connection_lost, None)
../../.pyenv/versions/3.13.0/lib/python3.13/asyncio/base_events.py:829: in call_soon
self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=True debug=False>
def _check_closed(self):
if self._closed:
> raise RuntimeError('Event loop is closed')
E RuntimeError: Event loop is closed
../../.pyenv/versions/3.13.0/lib/python3.13/asyncio/base_events.py:552: RuntimeError
==================================================================== short test summary info =====================================================================
FAILED tests/test.py::test_read_items - RuntimeError: Event loop is closed
Using the following, using a FastAPI dependency to generate a new Redis instance for each request, I don't get an error in the test.
async def get_redis() -> AsyncGenerator[redis.asyncio.Redis, None]:
redis_con = redis.asyncio.Redis()
try:
yield redis_con
finally:
await redis_con.aclose()
@app.get("/items/")
async def read_items(redis_con: redis.asyncio.Redis = Depends(get_redis)):
value = await redis_con.get("foo")
return {"value": value}
That said, I only get this behaviour while testing. Doing multiple queries to the same endpoint using the long-lived Redis instance works fine with plain cURL.
Now, my actual application that exhibits this behaviour puts manages the Redis instance using a lifespan, but the end result for tests is the same as the simple case described above.
At this stage I'm not sure whether this is an issue in redis-py, fastapi or pytest-asyncio.
FWIW, here's my pyproject.toml associated with the minimal example:
[project]
name = "redistest"
version = "0.1.0"
description = "Add your description here"
requires-python = ">=3.13"
dependencies = [
"fastapi[standard]>=0.115.11",
"httpx>=0.28.1",
"pytest>=8.3.5",
"pytest-asyncio>=0.25.3",
"redis>=5.2.1",
]
[build-system]
requires = ['setuptools']
build-backend = 'setuptools.build_meta'
[tool.pytest.ini_options]
asyncio_default_fixture_loop_scope = "function"
asyncio_mode = "auto"