redis-py
redis-py copied to clipboard
Async redis in multiprocessing situation hangs while attempting connection in the child process
Version: redis.asyncio ver. 5.0.1
Platform: Python 3.8.10 on Ubuntu 20.04.6 LTS
Description: The following steps are taken to reproduce this problem:
- In the parent process: create an async ConnectionPool, create an async redis client, connect to Redis and perform a command.
- Using concurrent.futures.ProcessPoolExecutor, fork a child process and in that child process: create an async ConnectionPool, create an async redis client, attempting to connect to Redis and perform a command will hang.
My speculation is that there is some contamination of the socket or connection created in the parent process by Python asyncio or redis.asyncio that causes this problem in the child process. However, I do not know either python package well enough to debug this much further.
Included link is a file which can be run from pytest or in the command line that reproduces the problem. I am sorry that I could not reduce the code to anything much smaller. Though this is greatly reduces from the original code. Changing the variable test_redis_in_parent from True to False will make this problem go away. The difference being that the command/connection is not made in the parent process before the child is forked off.
Link provided as python attachments are not allowed: https://raw.githubusercontent.com/py-vcon/py-vcon/acba392bfd2f966db7820e8a9595c8c0ad5df41d/py_vcon_server/tests/test_redis_multiprocess.py
I have been able to get a little further in debugging this. Here is a partial stack just before it hangs in await:
/home/dpetrie/dev/py-vcon/py_vcon_server/tests/test_redis_multiprocess.py(44)connect_and_get_things() -> await get_things(db_pool2) /home/dpetrie/dev/py-vcon/py_vcon_server/tests/test_redis_multiprocess.py(28)get_things() -> things = await client.smembers(DB_JUNK) /usr/local/lib/python3.8/dist-packages/redis/asyncio/client.py(601)execute_command() -> conn = self.connection or await pool.get_connection(command_name, **options) /usr/local/lib/python3.8/dist-packages/redis/asyncio/connection.py(1040)get_connection() -> await self.ensure_connection(connection) /usr/local/lib/python3.8/dist-packages/redis/asyncio/connection.py(1062)ensure_connection() -> await connection.connect() /usr/local/lib/python3.8/dist-packages/redis/asyncio/connection.py(243)connect() -> await self.retry.call_with_retry( /usr/local/lib/python3.8/dist-packages/redis/asyncio/retry.py(59)call_with_retry() -> return await do() /usr/local/lib/python3.8/dist-packages/redis/asyncio/connection.py(650)_connect() -> reader, writer = await asyncio.open_connection( /usr/lib/python3.8/asyncio/streams.py(52)open_connection() -> transport, _ = await loop.create_connection( /usr/lib/python3.8/asyncio/base_events.py(986)create_connection() -> infos = await self._ensure_resolved( /usr/lib/python3.8/asyncio/base_events.py(1365)_ensure_resolved() -> return await loop.getaddrinfo(host, port, family=family, type=type, /usr/lib/python3.8/asyncio/base_events.py(825)getaddrinfo() -> return await self.run_in_executor(
/usr/lib/python3.8/asyncio/base_events.py(782)run_in_executor() -> return futures.wrap_future(
It seems to be hanging waiting for the result of getaddrinfo. To reiterate this is occurring in the child process that was forked. The executor argument to run_in_executor is None. It gets the executor from self._default_executor. I had another, perhaps crazy, hypothesis that the executor used in run_in_executor was one from the parent process instead of from the child process. However, I am not sure how to prove or disproved that.
executor = <concurrent.futures.thread.ThreadPoolExecutor object at 0x7fab2b263040>
I apologize if I am doing something stupid or unsupported with asyncio, multiprocessing or redis.asyncio. If so please, help me understand what I am doing wrong.
This issue is marked stale. It will be closed in 30 days if it is not updated.
Hi,
I stumbled upon this (or a similar ?) issue in my project ( some django channels test running with pytest asyncio):
In my case redis.asyncio.client.Redis.execute_command is called, and in there the self.connection(None) or pool.get_connection(...) is triggering the debugger breakpoint, but it never goes into that redis.asyncio.connection.ConnectionPool.get_connection method
Interestingly I am able to reproduce the behaviour with the file linked from this issue on my setup:
$ python redis_connect_hangs.py
/home/alexv/Projects/myproject/redis_connect_hangs.py:60: DeprecationWarning: connections() is deprecated and will be removed; use net_connections() instead
sockets = p.connections(kind = "inet")
pid: 609400 open sockets: []
pid: 609400 open sockets: [pconn(fd=6, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_STREAM: 1>, laddr=addr(ip='127.0.0.1', port=59352), raddr=addr(ip='127.0.0.1', port=6379), status='ESTABLISHED')]
getting client
getting things
got things pid: 609400
getting client
getting things
got things pid: 609400
getting client
getting things
got things pid: 609400
getting client
getting things
got things pid: 609400
other process pid: 609402
entering connect_and_get_things
pid: 609402 open sockets: [pconn(fd=6, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_STREAM: 1>, laddr=addr(ip='127.0.0.1', port=59352), raddr=addr(ip='127.0.0.1', port=6379), status='ESTABLISHED')]
setup pool
getting client
getting things
hangs here in forked process: 609402
^CProcess ForkProcess-2:
Process ForkProcess-3:
Process ForkProcess-4:
Traceback (most recent call last):
File "/home/alexv/Projects/myproject/redis_connect_hangs.py", line 113, in <module>
sys.exit(asyncio.run(test_redis_multiprocessing()))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/alexv/Projects/myproject/.direnv/python-3.11/lib/python3.11/site-packages/nest_asyncio.py", line 30, in run
return loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/alexv/Projects/myproject/.direnv/python-3.11/lib/python3.11/site-packages/nest_asyncio.py", line 92, in run_until_complete
self._run_once()
File "/home/alexv/Projects/myproject/.direnv/python-3.11/lib/python3.11/site-packages/nest_asyncio.py", line 133, in _run_once
handle._run()
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/asyncio/tasks.py", line 360, in __wakeup
self.__step()
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/asyncio/tasks.py", line 277, in __step
result = coro.send(None)
^^^^^^^^^^^^^^^
File "/home/alexv/Projects/myproject/redis_connect_hangs.py", line 93, in test_redis_multiprocessing
completed_states = concurrent.futures.wait(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/concurrent/futures/_base.py", line 305, in wait
waiter.event.wait(timeout)
Traceback (most recent call last):
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/threading.py", line 629, in wait
Traceback (most recent call last):
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
self.run()
Traceback (most recent call last):
signaled = self._cond.wait(timeout)
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/concurrent/futures/process.py", line 249, in _process_worker
call_item = call_queue.get(block=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/queues.py", line 102, in get
with self._rlock:
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/threading.py", line 331, in wait
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
self.run()
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/concurrent/futures/process.py", line 249, in _process_worker
call_item = call_queue.get(block=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/queues.py", line 102, in get
with self._rlock:
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
gotit = waiter.acquire(True, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
self.run()
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/concurrent/futures/process.py", line 249, in _process_worker
call_item = call_queue.get(block=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/queues.py", line 103, in get
res = self._recv_bytes()
^^^^^^^^^^^^^^^^^^
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/connection.py", line 430, in _recv_bytes
buf = self._recv(4)
^^^^^^^^^^^^^
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/multiprocessing/connection.py", line 395, in _recv
chunk = read(handle, remaining)
^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
Task exception was never retrieved
future: <Task finished name='Task-1' coro=<test_redis_multiprocessing() done, defined at /home/alexv/Projects/myproject/redis_connect_hangs.py:64> exception=KeyboardInterrupt()>
Traceback (most recent call last):
File "/home/alexv/Projects/myprojectredis_connect_hangs.py", line 113, in <module>
sys.exit(asyncio.run(test_redis_multiprocessing()))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/alexv/Projects/myproject/.direnv/python-3.11/lib/python3.11/site-packages/nest_asyncio.py", line 30, in run
return loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/alexv/Projects/myproject/.direnv/python-3.11/lib/python3.11/site-packages/nest_asyncio.py", line 92, in run_until_complete
self._run_once()
File "/home/alexv/Projects/myproject/.direnv/python-3.11/lib/python3.11/site-packages/nest_asyncio.py", line 133, in _run_once
handle._run()
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/asyncio/tasks.py", line 360, in __wakeup
self.__step()
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/asyncio/tasks.py", line 277, in __step
result = coro.send(None)
^^^^^^^^^^^^^^^
File "/home/alexv/Projects/myproject/redis_connect_hangs.py", line 93, in test_redis_multiprocessing
completed_states = concurrent.futures.wait(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/concurrent/futures/_base.py", line 305, in wait
waiter.event.wait(timeout)
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/threading.py", line 629, in wait
signaled = self._cond.wait(timeout)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/alexv/.asdf/installs/python/3.11.9/lib/python3.11/threading.py", line 331, in wait
gotit = waiter.acquire(True, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
I am running Python 3.11.9 with:
(myproject<3.11>) alexv@lenovo-ryzen-pop:~/Projects/myproject$ pip list | grep -e 'os\|sys\|multprocessing\|concurrent\|asyncio\|nest_asyncio\|pytest\|psutil\|redis'
build 1.2.2.post1
channels_redis 4.2.1
django-redis 5.4.0
nest-asyncio 1.6.0
outcome 1.3.0.post0
psutil 7.0.0
pytest 8.3.5
pytest-asyncio 0.26.0
pytest-cov 6.0.0
pytest-django 4.10.0
python-dateutil 2.9.0.post0
redis 5.2.1
I ll post here if I find out something relevant about this issue...