channels_redis
channels_redis copied to clipboard
v4b1: PubSub receive cleanup hang
Following discussion on https://github.com/django/channels_redis/pull/317
On 4.0.0b1, the test_groups_basic in either test_pubsub.py and test_pubsub_sentinel.py tests can hang intermittently. This is most pronounced on CI environments (GitHub actions for this repo show some examples for PRs), and locally for myself occurs roughly every 6-8 runs of the below snippet.
The hang occurs with a RedisPubSubChannelLayer when checking that a message is not received on some particular channel, this is a small test to more easily produce the issue for test_pubsub:
@pytest.mark.asyncio
async def test_receive_hang(channel_layer):
channel_name = await channel_layer.new_channel(prefix="test-channel")
with pytest.raises(asyncio.TimeoutError):
async with async_timeout.timeout(1):
await channel_layer.receive(channel_name)
Preliminary tracing found receive on attempting to unsubscribe fails to ever return a connection from _get_sub_conn.
A _receive_task appears to never return on multiple attempts, holding a lock indefinitely.
The following print annotations,
async def _get_sub_conn(self):
if self._keepalive_task is None:
self._keepalive_task = asyncio.ensure_future(self._do_keepalive())
if self._lock is None:
self._lock = asyncio.Lock()
print(self._lock)
async with self._lock:
if self._sub_conn is not None and self._sub_conn.connection is None:
await self._put_redis_conn(self._sub_conn)
self._sub_conn = None
self._notify_consumers(self.channel_layer.on_disconnect)
if self._sub_conn is None:
if self._receive_task is not None:
print(self._receive_task)
self._receive_task.cancel()
try:
print("waiting for receive_task")
await self._receive_task
except asyncio.CancelledError:
print("receive_task cancelled")
# This is the normal case, that `asyncio.CancelledError` is throw. All good.
pass
Produce, on hang an output of:
<asyncio.locks.Lock object at 0x7f88fd85a7f0 [unlocked]>
<asyncio.locks.Lock object at 0x7f88fd85a7f0 [unlocked]>
<Task pending name='Task-4' coro=<RedisSingleShardConnection._do_receiving() running at channels_redis/channels_redis/pubsub.py:409> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f88fd895490>()]>>
waiting for receive_task
receive_task got cancelled
<asyncio.locks.Lock object at 0x7f88fd85a7f0 [unlocked]>
<Task pending name='Task-5' coro=<RedisSingleShardConnection._do_receiving() running at channels_redis/channels_redis/pubsub.py:391> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f49b6af5c70>()]>>
waiting for receive_task
<asyncio.locks.Lock object at 0x7f88fd85a7f0 [locked]>
Successful runs have the last line swapped for "receive_task cancelled" and a clean exit.
Ideas so far from the above is:
Hi @bbrowning918 @acu192 — Just a quick update.
I've pushed a few cleanup PRs on main #320 #321 #322, to get rid of the async_generator dependency, and bring pytest-asyncio up to date.
Actions seem to be going well... 🤔 https://github.com/django/channels_redis/actions
I think it's worth rebasing efforts to make sure we're not just hitting Using old stuff issues.
I'll note we've got one warning still coming out of redis-py on PY310:
=============================== warnings summary ===============================
113 tests/test_pubsub_sentinel.py: 18 warnings
114 tests/test_sentinel.py: 11 warnings
115 /home/runner/work/channels_redis/channels_redis/.tox/py310/lib/python3.10/site-packages/redis/asyncio/connection.py:677: DeprecationWarning: There is no current event loop
116 loop = asyncio.get_event_loop()
Locally, I'm hitting the freeze maybe one-time-in-two (or so) with the reduced test case here so will dig into that next.
OK, so yes. When if stalls we just get stuck in that _do_receiving while True loop. Not sure yet why.
HI @Andrew-Chen-Wang — I don't know if you have any bandwidth at the moment — if not no problem, sorry for the noise 🎁 — but would you maybe be able to glance and this, and the related discussion on #317 just to see if something jumps out at you about the redis usage, after the migration to redis-py? Thanks 🙏
Hi all - hope you're well, figured I'd pop my head in since I had some free time and see if I could lend a hand.
This jumped out as something interesting to investigate, and I can't quite make heads or tails of it after a few minutes of poking about. But I had a feeling that it was something to do with the async timeouts package, and a quick look at their repo led me to this old issue which has repro code that looks suspiciously similar to some of our patterns: https://github.com/aio-libs/async-timeout/issues/229#issuecomment-908502523
Anyway will take a another look tomorrow when I have more time.
@bbrowning918 can you please have a look at this branch here: https://github.com/zumalabs/channels_redis/pull/11
I'm not entirely sure what the issue is, but the test still highlighted a few improvements nonetheless.
Hmm, locally it seems to show that both test_receive_hang run pass, but it never moves on to the next test(s); which is what I think happened on the GitHub actions on your PR where all timed out. Why is still a mystery to me.
I went down the rabbit hole from the async-timeout issue to CPython core issues and conversations around where they were seeing similar (as far as I could tell) edge cases and timing problems with cancellation/timeout. That does seem to fit what we're seeing here but again is puzzling to me.
I've found at least a hacky workaround:
async def _do_keepalive(self):
while True:
await asyncio.sleep(3)
try:
await self._get_sub_conn()
except Exception:
logger.exception("Unexpected exception in keepalive task:")
Bumping the sleep on the periodic keep alive task to anything greater than the timeouts in the tests stops any hang from occurring in my usually hang-happy local testing.
Under a tight window while we're doing our cleanup work, I believe the keep alive kicks off another _get_sub_conn and it is that call that hangs indefinitely on receive.
There is quite a large docstring on _do_keepalive which may not all hold true with redis-py or the changes to _get_sub_conn any more, so I am curious on any input with that. Would the keep alive heartbeat be better configurable as opposed to fixed?
Nice work @bbrowning918. 🕵️♀️ (Current status: not sure — Once I get the Channels update ready I will swing back here for a play. Happy if you want to make suggestions!)
@acu192 — Do you have half a cycle to look at the discussion here and see what you think? (Thanks)
I'll play around a bit this weekend. It seems that in the shard flush we need to take the lock to prevent the keepalive from bringing the shard back to life.
Ok, great @qeternity.
If we can get a solution here in the next week or so that would be great, otherwise I'll push the release anyway, and we'll have to fix it later. 😜
So there was quite a bit of cruft in the old aioredis logic around marshaling raw redis connections and keepalives. Using redis-py pools we get built in keepalives by using a low timeouts on the subscriber connection which will auto reconnect and resubscribe.
I've opened this quick refactor (https://github.com/django/channels_redis/pull/326) of the pubsub shard which resolves all the hangs and cleans up the code a bit. I can't find a way to implement the disconnect/reconnect notifiers under redis-py however.
There looks to be some desirable code in redis-py that hasn't been released now, specifically pertaining to autoreconnecting in pubsub blocking mode.
The above refactor does not auto reconnect/reubscribe at scale in our test harness, so I will continue to investigate.
Ok - this is now running pretty well in our chaos harness.
I've rolled in #326 and pushed 4.0.0b2 to PyPI. I'd be grateful if folks could try it out — looking for final releases next week. 👍