pytest-asyncio icon indicating copy to clipboard operation
pytest-asyncio copied to clipboard

Warn about event loop changes using Redis/FakeRedis

Open M1ha-Shvn opened this issue 2 years ago • 12 comments

Hi. I'm using fakeredis to run tests via pytest-asyncio in FastAPI application.

Library versions:

fakeredis==2.19.0
fastapi==0.103.1
hiredis==2.2.3
lupa==2.0
pytest-asyncio==0.21.1
redis==5.0.0

Everything works well in python 3.9 and earlier. But in 3.10 and 3.11 I start getting exceptions:

test setup failed
event_loop = <_UnixSelectorEventLoop running=False closed=False debug=False>
request = <SubRequest 'storage_clean_up' for <Function test_read_from_cache__cache_save_failure>>
kwargs = {}
setup = <function _wrap_async_fixture.<locals>._async_fixture_wrapper.<locals>.setup at 0x7feb442d5900>

    @functools.wraps(fixture)
    def _async_fixture_wrapper(
        event_loop: asyncio.AbstractEventLoop, request: SubRequest, **kwargs: Any
    ):
        func = _perhaps_rebind_fixture_func(
            fixture, request.instance, fixturedef.unittest
        )
    
        async def setup():
            res = await func(**_add_kwargs(func, kwargs, event_loop, request))
            return res
    
>       return event_loop.run_until_complete(setup())

../pip/lib/python3.10/site-packages/pytest_asyncio/plugin.py:326: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/local/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
    return future.result()
../pip/lib/python3.10/site-packages/pytest_asyncio/plugin.py:323: in setup
    res = await func(**_add_kwargs(func, kwargs, event_loop, request))
/opt/project/core/storages/tests/fixtures.py:23: in storage_clean_up
    await cls.storage.truncate()
/opt/project/core/storages/redis.py:285: in truncate
    await client.flushall()
../pip/lib/python3.10/site-packages/redis/asyncio/client.py:545: in execute_command
    conn = self.connection or await pool.get_connection(command_name, **options)
../pip/lib/python3.10/site-packages/redis/asyncio/connection.py:1111: in get_connection
    if await connection.can_read_destructive():
../pip/lib/python3.10/site-packages/redis/asyncio/connection.py:472: in can_read_destructive
    return await self._parser.can_read_destructive()
../pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:179: in can_read_destructive
    return await self.read_from_socket()
../pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:184: in read_from_socket
    buffer = await self._stream.read(self._read_size)
../pip/lib/python3.10/site-packages/fakeredis/aioredis.py:83: in read
    return await self._socket.responses.get()  # type:ignore
/usr/local/lib/python3.10/asyncio/queues.py:156: in get
    getter = self._get_loop().create_future()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Queue at 0x7feb429a9c60 maxsize=0 tasks=4>

    def _get_loop(self):
        loop = events._get_running_loop()
    
        if self._loop is None:
            with _global_lock:
                if self._loop is None:
                    self._loop = loop
        if loop is not self._loop:
>           raise RuntimeError(f'{self!r} is bound to a different event loop')
E           RuntimeError: <Queue at 0x7feb429a9c60 maxsize=0 tasks=4> is bound to a different event loop

/usr/local/lib/python3.10/asyncio/mixins.py:30: RuntimeError

If I disable FakeRedis and use redis-py in tests without isolation, I start getting this error:

self = Connection<host=redis,port=6379,db=0>, disable_decoding = False
timeout = None

    async def read_response(
        self,
        disable_decoding: bool = False,
        timeout: Optional[float] = None,
        *,
        disconnect_on_error: bool = True,
        push_request: Optional[bool] = False,
    ):
        """Read the response from a previously sent command"""
        read_timeout = timeout if timeout is not None else self.socket_timeout
        host_error = self._host_error()
        try:
            if (
                read_timeout is not None
                and self.protocol in ["3", 3]
                and not HIREDIS_AVAILABLE
            ):
                async with async_timeout(read_timeout):
                    response = await self._parser.read_response(
                        disable_decoding=disable_decoding, push_request=push_request
                    )
            elif read_timeout is not None:
                async with async_timeout(read_timeout):
                    response = await self._parser.read_response(
                        disable_decoding=disable_decoding
                    )
            elif self.protocol in ["3", 3] and not HIREDIS_AVAILABLE:
                response = await self._parser.read_response(
                    disable_decoding=disable_decoding, push_request=push_request
                )
            else:
>               response = await self._parser.read_response(
                    disable_decoding=disable_decoding
                )

/app/pip/lib/python3.10/site-packages/redis/asyncio/connection.py:509: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/app/pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:203: in read_response
    await self.read_from_socket()
/app/pip/lib/python3.10/site-packages/redis/_parsers/hiredis.py:184: in read_from_socket
    buffer = await self._stream.read(self._read_size)
/usr/local/lib/python3.10/asyncio/streams.py:669: in read
    await self._wait_for_data('read')
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <StreamReader transport=<_SelectorSocketTransport closing fd=11>>
func_name = 'read'

    async def _wait_for_data(self, func_name):
        """Wait until feed_data() or feed_eof() is called.
    
        If stream was paused, automatically resume it.
        """
        # StreamReader uses a future to link the protocol feed_data() method
        # to a read coroutine. Running two read coroutines at the same time
        # would have an unexpected behaviour. It would not possible to know
        # which coroutine would get the next data.
        if self._waiter is not None:
            raise RuntimeError(
                f'{func_name}() called while another coroutine is '
                f'already waiting for incoming data')
    
        assert not self._eof, '_wait_for_data after EOF'
    
        # Waiting for data while paused will make deadlock, so prevent it.
        # This is essential for readexactly(n) for case when n > self._limit.
        if self._paused:
            self._paused = False
            self._transport.resume_reading()
    
        self._waiter = self._loop.create_future()
        try:
>           await self._waiter
E           RuntimeError: Task <Task pending name='Task-3' coro=<_wrap_async_fixture.<locals>._async_fixture_wrapper.<locals>.setup() running at /app/pip/lib/python3.10/site-packages/pytest_asyncio/plugin.py:323> cb=[_run_until_complete_cb() at /usr/local/lib/python3.10/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop

/usr/local/lib/python3.10/asyncio/streams.py:501: RuntimeError

During handling of the above exception, another exception occurred:

cls = <core.storages.tests.test_storages.TestRedisStorage object at 0x7f8c542801c0>

    @classmethod
    @pytest_asyncio.fixture(autouse=True)
    async def storage_clean_up(cls):
>       await cls.storage.truncate()

test_storages.py:51: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../redis.py:282: in truncate
    await client.flushall()
/app/pip/lib/python3.10/site-packages/redis/asyncio/client.py:550: in execute_command
    return await conn.retry.call_with_retry(
/app/pip/lib/python3.10/site-packages/redis/asyncio/retry.py:59: in call_with_retry
    return await do()
/app/pip/lib/python3.10/site-packages/redis/asyncio/client.py:524: in _send_command_parse_response
    return await self.parse_response(conn, command_name, **options)
/app/pip/lib/python3.10/site-packages/redis/asyncio/client.py:571: in parse_response
    response = await connection.read_response()
/app/pip/lib/python3.10/site-packages/redis/asyncio/connection.py:529: in read_response
    await self.disconnect(nowait=True)
/app/pip/lib/python3.10/site-packages/redis/asyncio/connection.py:385: in disconnect
    self._writer.close()  # type: ignore[union-attr]
/usr/local/lib/python3.10/asyncio/streams.py:337: in close
    return self._transport.close()
/usr/local/lib/python3.10/asyncio/selector_events.py:706: in close
    self._loop.call_soon(self._call_connection_lost, None)
/usr/local/lib/python3.10/asyncio/base_events.py:753: in call_soon
    self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_UnixSelectorEventLoop running=False closed=True debug=False>

    def _check_closed(self):
        if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

/usr/local/lib/python3.10/asyncio/base_events.py:515: RuntimeError

Firstly, I thought, it was a problem in Fakeredis and created an issue there. But now I suppose, that it is something connected with pytest-asyncio as:

  1. Tests fail in base redis library too
  2. Redis-py does not fail without tests, my project works well while testing the server manually without tests

Can you suggest anything and where to dig more? Thanks

M1ha-Shvn avatar Sep 26 '23 04:09 M1ha-Shvn

Both stack traces complain about queues or tasks being attached to different event loops. Two things come to my mind:

  1. The event_loop fixture closes any existing loop and returns a new one. If you have tests that need to reuse the same loop, you need to ensure the correct scope of the event_loop fixture. You can print fixture scopes by running pytest with the option --setup-show.
  2. Async fixtures a pre-processed during collection phase. They run in their own loop. Trying to reuse a queue from a fixture in a test will trigger the error about the queue being attached to a different loop.

As to why this occurs only from Python 3.10 onwards, I don't know.

seifertm avatar Sep 26 '23 07:09 seifertm

Thanks for comment. I'll have a look

M1ha-Shvn avatar Sep 27 '23 05:09 M1ha-Shvn

@M1ha-Shvn Any updates on your issue?

seifertm avatar Oct 09 '23 11:10 seifertm

I guess, my problem is a result of architecture: Redis storage connection object realises Singleton pattern. It creates single object and reuses Redis instance. For testing purposes it replaces Redis with FakeRedis. When tests are run each test is run in separate loop, but singleton object is still the same (as python thread is the same) which leads to loop changing problems when calling Redis commands.

I've fixed the problem for myself by 2 things:

  1. A hacky solution regenerating FakeRedis object on loop change:
      current_loop = asyncio.get_event_loop()
      if self._loop is not None and self._loop is not current_loop:
          self._redis_clients.clear()

          # Script should be created in same loop, where it is executed
          self._generate_worker_id_script = None

          log('asyncio_loop_changed', 'asyncio_loop_changed', level=logging.WARNING)

      self._loop = current_loop
  1. Adding a global fixture with common event_loop to conftest.py. I'm not exactly sure why, but without this some clean up fixtures inside tests didn't work properly and left my FakeRedis database in unclear state
@pytest.fixture(scope="session", autouse=True)
def event_loop() -> Generator["AbstractEventLoop", Any, None]:
    policy = get_event_loop_policy()
    loop = policy.new_event_loop()
    yield loop
    loop.close()

M1ha-Shvn avatar Oct 10 '23 04:10 M1ha-Shvn

Maybe there's a smarter way for pytest-asyncio to detect loop changes. I'll leave the issue open for the time being.

seifertm avatar Nov 10 '23 08:11 seifertm

Faced the same problem when running tests locally

vprud avatar Aug 23 '24 12:08 vprud

It is interesting that the error is reproduced only when run tests module like pytest path/to/module/with/tests. If run purest for all tests like pytest ., there will be no error

vprud avatar Aug 30 '24 12:08 vprud

I had the same issue. I was using redis version 4.5.*, switched to 5.2.*, and the issue disappeared.

Maybe this will help someone

owakira avatar Nov 13 '24 13:11 owakira

I tried it on version 5.0.0 – the error occurs.

owakira avatar Nov 13 '24 13:11 owakira

Same issue here! Temporarily fixed by using only one session-level loop for both fixtures and tests. But that's not nice. +1 to try to find a fix for this.

IngLP avatar Dec 02 '24 16:12 IngLP

I had the same issue with fastapi. for me what solved it was making my tests async. that way it will use the same event loop the tests and fixture are using. the issue occurs because when you are using sync client, it invokes the endpoint using the ThreadPool executor, so the code running inside the endpoint is running on a different thread - hence different event loop.it means:

  1. @pytest.mark.asyncio on your test
  2. Make you test async def
  3. Set async client to invoke your api - image Good luck.

idan-dagan-imagenai avatar Dec 11 '24 19:12 idan-dagan-imagenai

I had the same issue with fastapi. for me what solved it was making my tests async. that way it will use the same event loop the tests and fixture are using. the issue occurs because when you are using sync client, it invokes the endpoint using the ThreadPool executor, so the code running inside the endpoint is running on a different thread - hence different event loop.it means:

  1. @pytest.mark.asyncio on your test
  2. Make you test async def
  3. Set async client to invoke your api -

image Good luck.

  1. All my tests are async. All classes are marked with @pytest.mark.asyncio
  2. Can you explain, what do your AsyncClient does in your code? what is async_client? Where it is imported from and what is its purpose?

M1ha-Shvn avatar Dec 12 '24 08:12 M1ha-Shvn