redis-py icon indicating copy to clipboard operation
redis-py copied to clipboard

Async redis in multiprocessing situation hangs while attempting connection in the child process

Open dgpetrie opened this issue 1 year ago • 3 comments

Version: redis.asyncio ver. 5.0.1

Platform: Python 3.8.10 on Ubuntu 20.04.6 LTS

Description: The following steps are taken to reproduce this problem:

  1. In the parent process: create an async ConnectionPool, create an async redis client, connect to Redis and perform a command.
  2. Using concurrent.futures.ProcessPoolExecutor, fork a child process and in that child process: create an async ConnectionPool, create an async redis client, attempting to connect to Redis and perform a command will hang.

My speculation is that there is some contamination of the socket or connection created in the parent process by Python asyncio or redis.asyncio that causes this problem in the child process. However, I do not know either python package well enough to debug this much further.

Included link is a file which can be run from pytest or in the command line that reproduces the problem. I am sorry that I could not reduce the code to anything much smaller. Though this is greatly reduces from the original code. Changing the variable test_redis_in_parent from True to False will make this problem go away. The difference being that the command/connection is not made in the parent process before the child is forked off.

Link provided as python attachments are not allowed: https://raw.githubusercontent.com/py-vcon/py-vcon/acba392bfd2f966db7820e8a9595c8c0ad5df41d/py_vcon_server/tests/test_redis_multiprocess.py

dgpetrie avatar Feb 11 '24 19:02 dgpetrie

I have been able to get a little further in debugging this. Here is a partial stack just before it hangs in await:

/home/dpetrie/dev/py-vcon/py_vcon_server/tests/test_redis_multiprocess.py(44)connect_and_get_things() -> await get_things(db_pool2) /home/dpetrie/dev/py-vcon/py_vcon_server/tests/test_redis_multiprocess.py(28)get_things() -> things = await client.smembers(DB_JUNK) /usr/local/lib/python3.8/dist-packages/redis/asyncio/client.py(601)execute_command() -> conn = self.connection or await pool.get_connection(command_name, **options) /usr/local/lib/python3.8/dist-packages/redis/asyncio/connection.py(1040)get_connection() -> await self.ensure_connection(connection) /usr/local/lib/python3.8/dist-packages/redis/asyncio/connection.py(1062)ensure_connection() -> await connection.connect() /usr/local/lib/python3.8/dist-packages/redis/asyncio/connection.py(243)connect() -> await self.retry.call_with_retry( /usr/local/lib/python3.8/dist-packages/redis/asyncio/retry.py(59)call_with_retry() -> return await do() /usr/local/lib/python3.8/dist-packages/redis/asyncio/connection.py(650)_connect() -> reader, writer = await asyncio.open_connection( /usr/lib/python3.8/asyncio/streams.py(52)open_connection() -> transport, _ = await loop.create_connection( /usr/lib/python3.8/asyncio/base_events.py(986)create_connection() -> infos = await self._ensure_resolved( /usr/lib/python3.8/asyncio/base_events.py(1365)_ensure_resolved() -> return await loop.getaddrinfo(host, port, family=family, type=type, /usr/lib/python3.8/asyncio/base_events.py(825)getaddrinfo() -> return await self.run_in_executor(

/usr/lib/python3.8/asyncio/base_events.py(782)run_in_executor() -> return futures.wrap_future(

It seems to be hanging waiting for the result of getaddrinfo. To reiterate this is occurring in the child process that was forked. The executor argument to run_in_executor is None. It gets the executor from self._default_executor. I had another, perhaps crazy, hypothesis that the executor used in run_in_executor was one from the parent process instead of from the child process. However, I am not sure how to prove or disproved that.

executor = <concurrent.futures.thread.ThreadPoolExecutor object at 0x7fab2b263040>

I apologize if I am doing something stupid or unsupported with asyncio, multiprocessing or redis.asyncio. If so please, help me understand what I am doing wrong.

dgpetrie avatar Feb 11 '24 21:02 dgpetrie

This issue is marked stale. It will be closed in 30 days if it is not updated.

github-actions[bot] avatar Feb 14 '25 00:02 github-actions[bot]

Hi,

I stumbled upon this (or a similar ?) issue in my project ( some django channels test running with pytest asyncio): In my case redis.asyncio.client.Redis.execute_command is called, and in there the self.connection(None) or pool.get_connection(...) is triggering the debugger breakpoint, but it never goes into that redis.asyncio.connection.ConnectionPool.get_connection method

Interestingly I am able to reproduce the behaviour with the file linked from this issue on my setup:

$ python redis_connect_hangs.py
/home/alexv/Projects/myproject/redis_connect_hangs.py:60: DeprecationWarning: connections() is deprecated and will be removed; use net_connections() instead
  sockets = p.connections(kind = "inet")
pid: 609400 open sockets: []
pid: 609400 open sockets: [pconn(fd=6, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_STREAM: 1>, laddr=addr(ip='127.0.0.1', port=59352), raddr=addr(ip='127.0.0.1', port=6379), status='ESTABLISHED')]
getting client
getting things
got things pid: 609400
getting client
getting things
got things pid: 609400
getting client
getting things
got things pid: 609400
getting client
getting things
got things pid: 609400
other process pid: 609402
entering connect_and_get_things
pid: 609402 open sockets: [pconn(fd=6, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_STREAM: 1>, laddr=addr(ip='127.0.0.1', port=59352), raddr=addr(ip='127.0.0.1', port=6379), status='ESTABLISHED')]
setup pool
getting client
getting things
hangs here in forked process: 609402
^CProcess ForkProcess-2:
Process ForkProcess-3:
Process ForkProcess-4:
Traceback (most recent call last):
  File "/home/alexv/Projects/myproject/redis_connect_hangs.py", line 113, in <module>
    sys.exit(asyncio.run(test_redis_multiprocessing()))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alexv/Projects/myproject/.direnv/python-3.11/lib/python3.11/site-packages/nest_asyncio.py", line 30, in run
    return loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alexv/Projects/myproject/.direnv/python-3.11/lib/python3.11/site-packages/nest_asyncio.py", line 92, in run_until_complete
    self._run_once()
  File "/home/alexv/Projects/myproject/.direnv/python-3.11/lib/python3.11/site-packages/nest_asyncio.py", line 133, in _run_once
    handle._run()
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/asyncio/events.py", line 84, in _run
    self._context.run(self._callback, *self._args)
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/asyncio/tasks.py", line 360, in __wakeup
    self.__step()
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/asyncio/tasks.py", line 277, in __step
    result = coro.send(None)
             ^^^^^^^^^^^^^^^
  File "/home/alexv/Projects/myproject/redis_connect_hangs.py", line 93, in test_redis_multiprocessing
    completed_states = concurrent.futures.wait(
                       ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/concurrent/futures/_base.py", line 305, in wait
    waiter.event.wait(timeout)
Traceback (most recent call last):
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/threading.py", line 629, in wait
Traceback (most recent call last):
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
Traceback (most recent call last):
    signaled = self._cond.wait(timeout)
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/concurrent/futures/process.py", line 249, in _process_worker
    call_item = call_queue.get(block=True)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^
               ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/queues.py", line 102, in get
    with self._rlock:
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/threading.py", line 331, in wait
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
           ^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/concurrent/futures/process.py", line 249, in _process_worker
    call_item = call_queue.get(block=True)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/queues.py", line 102, in get
    with self._rlock:
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
           ^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
    gotit = waiter.acquire(True, timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/concurrent/futures/process.py", line 249, in _process_worker
    call_item = call_queue.get(block=True)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/queues.py", line 103, in get
    res = self._recv_bytes()
          ^^^^^^^^^^^^^^^^^^
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/connection.py", line 430, in _recv_bytes
    buf = self._recv(4)
          ^^^^^^^^^^^^^
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/connection.py", line 395, in _recv
    chunk = read(handle, remaining)
            ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
Task exception was never retrieved
future: <Task finished name='Task-1' coro=<test_redis_multiprocessing() done, defined at /home/alexv/Projects/myproject/redis_connect_hangs.py:64> exception=KeyboardInterrupt()>
Traceback (most recent call last):
  File "/home/alexv/Projects/myprojectredis_connect_hangs.py", line 113, in <module>
    sys.exit(asyncio.run(test_redis_multiprocessing()))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alexv/Projects/myproject/.direnv/python-3.11/lib/python3.11/site-packages/nest_asyncio.py", line 30, in run
    return loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alexv/Projects/myproject/.direnv/python-3.11/lib/python3.11/site-packages/nest_asyncio.py", line 92, in run_until_complete
    self._run_once()
  File "/home/alexv/Projects/myproject/.direnv/python-3.11/lib/python3.11/site-packages/nest_asyncio.py", line 133, in _run_once
    handle._run()
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/asyncio/events.py", line 84, in _run
    self._context.run(self._callback, *self._args)
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/asyncio/tasks.py", line 360, in __wakeup
    self.__step()
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/asyncio/tasks.py", line 277, in __step
    result = coro.send(None)
             ^^^^^^^^^^^^^^^
  File "/home/alexv/Projects/myproject/redis_connect_hangs.py", line 93, in test_redis_multiprocessing
    completed_states = concurrent.futures.wait(
                       ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/concurrent/futures/_base.py", line 305, in wait
    waiter.event.wait(timeout)
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/threading.py", line 629, in wait
    signaled = self._cond.wait(timeout)
               ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/threading.py", line 331, in wait
    gotit = waiter.acquire(True, timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt

I am running Python 3.11.9 with:

(myproject<3.11>) alexv@lenovo-ryzen-pop:~/Projects/myproject$ pip list | grep -e 'os\|sys\|multprocessing\|concurrent\|asyncio\|nest_asyncio\|pytest\|psutil\|redis'
build                            1.2.2.post1
channels_redis                   4.2.1
django-redis                     5.4.0
nest-asyncio                     1.6.0
outcome                          1.3.0.post0
psutil                           7.0.0
pytest                           8.3.5
pytest-asyncio                   0.26.0
pytest-cov                       6.0.0
pytest-django                    4.10.0
python-dateutil                  2.9.0.post0
redis                            5.2.1

I ll post here if I find out something relevant about this issue...

asmodehn avatar Mar 28 '25 17:03 asmodehn