Warn about event loop changes using Redis/FakeRedis
Hi. I'm using fakeredis to run tests via pytest-asyncio in FastAPI application.
Library versions:
fakeredis==2.19.0
fastapi==0.103.1
hiredis==2.2.3
lupa==2.0
pytest-asyncio==0.21.1
redis==5.0.0
Everything works well in python 3.9 and earlier. But in 3.10 and 3.11 I start getting exceptions:
test setup failed
event_loop = <_UnixSelectorEventLoop running=False closed=False debug=False>
request = <SubRequest 'storage_clean_up' for <Function test_read_from_cache__cache_save_failure>>
kwargs = {}
setup = <function _wrap_async_fixture.<locals>._async_fixture_wrapper.<locals>.setup at 0x7feb442d5900>
@functools.wraps(fixture)
def _async_fixture_wrapper(
event_loop: asyncio.AbstractEventLoop, request: SubRequest, **kwargs: Any
):
func = _perhaps_rebind_fixture_func(
fixture, request.instance, fixturedef.unittest
)
async def setup():
res = await func(**_add_kwargs(func, kwargs, event_loop, request))
return res
> return event_loop.run_until_complete(setup())
../pip/lib/python3.10/site-packages/pytest_asyncio/plugin.py:326:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
return future.result()
../pip/lib/python3.10/site-packages/pytest_asyncio/plugin.py:323: in setup
res = await func(**_add_kwargs(func, kwargs, event_loop, request))
/opt/project/core/storages/tests/fixtures.py:23: in storage_clean_up
await cls.storage.truncate()
/opt/project/core/storages/redis.py:285: in truncate
await client.flushall()
../pip/lib/python3.10/site-packages/redis/asyncio/client.py:545: in execute_command
conn = self.connection or await pool.get_connection(command_name, **options)
../pip/lib/python3.10/site-packages/redis/asyncio/connection.py:1111: in get_connection
if await connection.can_read_destructive():
../pip/lib/python3.10/site-packages/redis/asyncio/connection.py:472: in can_read_destructive
return await self._parser.can_read_destructive()
../pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:179: in can_read_destructive
return await self.read_from_socket()
../pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:184: in read_from_socket
buffer = await self._stream.read(self._read_size)
../pip/lib/python3.10/site-packages/fakeredis/aioredis.py:83: in read
return await self._socket.responses.get() # type:ignore
/usr/local/lib/python3.10/asyncio/queues.py:156: in get
getter = self._get_loop().create_future()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Queue at 0x7feb429a9c60 maxsize=0 tasks=4>
def _get_loop(self):
loop = events._get_running_loop()
if self._loop is None:
with _global_lock:
if self._loop is None:
self._loop = loop
if loop is not self._loop:
> raise RuntimeError(f'{self!r} is bound to a different event loop')
E RuntimeError: <Queue at 0x7feb429a9c60 maxsize=0 tasks=4> is bound to a different event loop
/usr/local/lib/python3.10/asyncio/mixins.py:30: RuntimeError
If I disable FakeRedis and use redis-py in tests without isolation, I start getting this error:
self = Connection<host=redis,port=6379,db=0>, disable_decoding = False
timeout = None
async def read_response(
self,
disable_decoding: bool = False,
timeout: Optional[float] = None,
*,
disconnect_on_error: bool = True,
push_request: Optional[bool] = False,
):
"""Read the response from a previously sent command"""
read_timeout = timeout if timeout is not None else self.socket_timeout
host_error = self._host_error()
try:
if (
read_timeout is not None
and self.protocol in ["3", 3]
and not HIREDIS_AVAILABLE
):
async with async_timeout(read_timeout):
response = await self._parser.read_response(
disable_decoding=disable_decoding, push_request=push_request
)
elif read_timeout is not None:
async with async_timeout(read_timeout):
response = await self._parser.read_response(
disable_decoding=disable_decoding
)
elif self.protocol in ["3", 3] and not HIREDIS_AVAILABLE:
response = await self._parser.read_response(
disable_decoding=disable_decoding, push_request=push_request
)
else:
> response = await self._parser.read_response(
disable_decoding=disable_decoding
)
/app/pip/lib/python3.10/site-packages/redis/asyncio/connection.py:509:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/app/pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:203: in read_response
await self.read_from_socket()
/app/pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:184: in read_from_socket
buffer = await self._stream.read(self._read_size)
/usr/local/lib/python3.10/asyncio/streams.py:669: in read
await self._wait_for_data('read')
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <StreamReader transport=<_SelectorSocketTransport closing fd=11>>
func_name = 'read'
async def _wait_for_data(self, func_name):
"""Wait until feed_data() or feed_eof() is called.
If stream was paused, automatically resume it.
"""
# StreamReader uses a future to link the protocol feed_data() method
# to a read coroutine. Running two read coroutines at the same time
# would have an unexpected behaviour. It would not possible to know
# which coroutine would get the next data.
if self._waiter is not None:
raise RuntimeError(
f'{func_name}() called while another coroutine is '
f'already waiting for incoming data')
assert not self._eof, '_wait_for_data after EOF'
# Waiting for data while paused will make deadlock, so prevent it.
# This is essential for readexactly(n) for case when n > self._limit.
if self._paused:
self._paused = False
self._transport.resume_reading()
self._waiter = self._loop.create_future()
try:
> await self._waiter
E RuntimeError: Task <Task pending name='Task-3' coro=<_wrap_async_fixture.<locals>._async_fixture_wrapper.<locals>.setup() running at /app/pip/lib/python3.10/site-packages/pytest_asyncio/plugin.py:323> cb=[_run_until_complete_cb() at /usr/local/lib/python3.10/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop
/usr/local/lib/python3.10/asyncio/streams.py:501: RuntimeError
During handling of the above exception, another exception occurred:
cls = <core.storages.tests.test_storages.TestRedisStorage object at 0x7f8c542801c0>
@classmethod
@pytest_asyncio.fixture(autouse=True)
async def storage_clean_up(cls):
> await cls.storage.truncate()
test_storages.py:51:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../redis.py:282: in truncate
await client.flushall()
/app/pip/lib/python3.10/site-packages/redis/asyncio/client.py:550: in execute_command
return await conn.retry.call_with_retry(
/app/pip/lib/python3.10/site-packages/redis/asyncio/retry.py:59: in call_with_retry
return await do()
/app/pip/lib/python3.10/site-packages/redis/asyncio/client.py:524: in _send_command_parse_response
return await self.parse_response(conn, command_name, **options)
/app/pip/lib/python3.10/site-packages/redis/asyncio/client.py:571: in parse_response
response = await connection.read_response()
/app/pip/lib/python3.10/site-packages/redis/asyncio/connection.py:529: in read_response
await self.disconnect(nowait=True)
/app/pip/lib/python3.10/site-packages/redis/asyncio/connection.py:385: in disconnect
self._writer.close() # type: ignore[union-attr]
/usr/local/lib/python3.10/asyncio/streams.py:337: in close
return self._transport.close()
/usr/local/lib/python3.10/asyncio/selector_events.py:706: in close
self._loop.call_soon(self._call_connection_lost, None)
/usr/local/lib/python3.10/asyncio/base_events.py:753: in call_soon
self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=True debug=False>
def _check_closed(self):
if self._closed:
> raise RuntimeError('Event loop is closed')
E RuntimeError: Event loop is closed
/usr/local/lib/python3.10/asyncio/base_events.py:515: RuntimeError
Firstly, I thought, it was a problem in Fakeredis and created an issue there. But now I suppose, that it is something connected with pytest-asyncio as:
- Tests fail in base redis library too
- Redis-py does not fail without tests, my project works well while testing the server manually without tests
Can you suggest anything and where to dig more? Thanks
Both stack traces complain about queues or tasks being attached to different event loops. Two things come to my mind:
- The event_loop fixture closes any existing loop and returns a new one. If you have tests that need to reuse the same loop, you need to ensure the correct scope of the event_loop fixture. You can print fixture scopes by running pytest with the option
--setup-show. - Async fixtures a pre-processed during collection phase. They run in their own loop. Trying to reuse a queue from a fixture in a test will trigger the error about the queue being attached to a different loop.
As to why this occurs only from Python 3.10 onwards, I don't know.
Thanks for comment. I'll have a look
@M1ha-Shvn Any updates on your issue?
I guess, my problem is a result of architecture: Redis storage connection object realises Singleton pattern. It creates single object and reuses Redis instance. For testing purposes it replaces Redis with FakeRedis. When tests are run each test is run in separate loop, but singleton object is still the same (as python thread is the same) which leads to loop changing problems when calling Redis commands.
I've fixed the problem for myself by 2 things:
- A hacky solution regenerating FakeRedis object on loop change:
current_loop = asyncio.get_event_loop()
if self._loop is not None and self._loop is not current_loop:
self._redis_clients.clear()
# Script should be created in same loop, where it is executed
self._generate_worker_id_script = None
log('asyncio_loop_changed', 'asyncio_loop_changed', level=logging.WARNING)
self._loop = current_loop
- Adding a global fixture with common event_loop to
conftest.py. I'm not exactly sure why, but without this some clean up fixtures inside tests didn't work properly and left my FakeRedis database in unclear state
@pytest.fixture(scope="session", autouse=True)
def event_loop() -> Generator["AbstractEventLoop", Any, None]:
policy = get_event_loop_policy()
loop = policy.new_event_loop()
yield loop
loop.close()
Maybe there's a smarter way for pytest-asyncio to detect loop changes. I'll leave the issue open for the time being.
Faced the same problem when running tests locally
It is interesting that the error is reproduced only when run tests module like pytest path/to/module/with/tests. If run purest for all tests like pytest ., there will be no error
I had the same issue. I was using redis version 4.5.*, switched to 5.2.*, and the issue disappeared.
Maybe this will help someone
I tried it on version 5.0.0 – the error occurs.
Same issue here! Temporarily fixed by using only one session-level loop for both fixtures and tests. But that's not nice. +1 to try to find a fix for this.
I had the same issue with fastapi. for me what solved it was making my tests async. that way it will use the same event loop the tests and fixture are using. the issue occurs because when you are using sync client, it invokes the endpoint using the ThreadPool executor, so the code running inside the endpoint is running on a different thread - hence different event loop.it means:
- @pytest.mark.asyncio on your test
- Make you test async def
- Set async client to invoke your api -
Good luck.
I had the same issue with fastapi. for me what solved it was making my tests async. that way it will use the same event loop the tests and fixture are using. the issue occurs because when you are using sync client, it invokes the endpoint using the ThreadPool executor, so the code running inside the endpoint is running on a different thread - hence different event loop.it means:
- @pytest.mark.asyncio on your test
- Make you test async def
- Set async client to invoke your api -
Good luck.
- All my tests are async. All classes are marked with
@pytest.mark.asyncio - Can you explain, what do your AsyncClient does in your code? what is async_client? Where it is imported from and what is its purpose?
Good luck.