channels_redis icon indicating copy to clipboard operation
channels_redis copied to clipboard

v4b1: PubSub receive cleanup hang

Open bbrowning918 opened this issue 3 years ago • 6 comments

Following discussion on https://github.com/django/channels_redis/pull/317

On 4.0.0b1, the test_groups_basic in either test_pubsub.py and test_pubsub_sentinel.py tests can hang intermittently. This is most pronounced on CI environments (GitHub actions for this repo show some examples for PRs), and locally for myself occurs roughly every 6-8 runs of the below snippet.

The hang occurs with a RedisPubSubChannelLayer when checking that a message is not received on some particular channel, this is a small test to more easily produce the issue for test_pubsub:

@pytest.mark.asyncio
async def test_receive_hang(channel_layer):
    channel_name = await channel_layer.new_channel(prefix="test-channel")
    with pytest.raises(asyncio.TimeoutError):
        async with async_timeout.timeout(1):
            await channel_layer.receive(channel_name)

Preliminary tracing found receive on attempting to unsubscribe fails to ever return a connection from _get_sub_conn.

A _receive_task appears to never return on multiple attempts, holding a lock indefinitely.

The following print annotations,

    async def _get_sub_conn(self):
        if self._keepalive_task is None:
            self._keepalive_task = asyncio.ensure_future(self._do_keepalive())
        if self._lock is None:
            self._lock = asyncio.Lock()
        print(self._lock)
        async with self._lock:
            if self._sub_conn is not None and self._sub_conn.connection is None:
                await self._put_redis_conn(self._sub_conn)
                self._sub_conn = None
                self._notify_consumers(self.channel_layer.on_disconnect)
            if self._sub_conn is None:
                if self._receive_task is not None:
                    print(self._receive_task)
                    self._receive_task.cancel()
                    try:
                        print("waiting for receive_task")
                        await self._receive_task
                    except asyncio.CancelledError:
                        print("receive_task cancelled")
                        # This is the normal case, that `asyncio.CancelledError` is throw. All good.
                        pass

Produce, on hang an output of:

<asyncio.locks.Lock object at 0x7f88fd85a7f0 [unlocked]>
<asyncio.locks.Lock object at 0x7f88fd85a7f0 [unlocked]>
<Task pending name='Task-4' coro=<RedisSingleShardConnection._do_receiving() running at channels_redis/channels_redis/pubsub.py:409> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f88fd895490>()]>>
waiting for receive_task
receive_task got cancelled
<asyncio.locks.Lock object at 0x7f88fd85a7f0 [unlocked]>
<Task pending name='Task-5' coro=<RedisSingleShardConnection._do_receiving() running at channels_redis/channels_redis/pubsub.py:391> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f49b6af5c70>()]>>
waiting for receive_task
<asyncio.locks.Lock object at 0x7f88fd85a7f0 [locked]>

Successful runs have the last line swapped for "receive_task cancelled" and a clean exit.

Ideas so far from the above is:

  1. We are consistently loosing the connection to Redis during the test
  2. _recieve_task has here and here as the prime blocking candidates

bbrowning918 avatar Jul 20 '22 04:07 bbrowning918

Hi @bbrowning918 @acu192 — Just a quick update.

I've pushed a few cleanup PRs on main #320 #321 #322, to get rid of the async_generator dependency, and bring pytest-asyncio up to date.

Actions seem to be going well... 🤔 https://github.com/django/channels_redis/actions

I think it's worth rebasing efforts to make sure we're not just hitting Using old stuff issues.

I'll note we've got one warning still coming out of redis-py on PY310:

=============================== warnings summary =============================== 113 tests/test_pubsub_sentinel.py: 18 warnings 114 tests/test_sentinel.py: 11 warnings 115 /home/runner/work/channels_redis/channels_redis/.tox/py310/lib/python3.10/site-packages/redis/asyncio/connection.py:677: DeprecationWarning: There is no current event loop 116 loop = asyncio.get_event_loop()

Locally, I'm hitting the freeze maybe one-time-in-two (or so) with the reduced test case here so will dig into that next.

carltongibson avatar Jul 21 '22 14:07 carltongibson

OK, so yes. When if stalls we just get stuck in that _do_receiving while True loop. Not sure yet why.

carltongibson avatar Jul 21 '22 15:07 carltongibson

HI @Andrew-Chen-Wang — I don't know if you have any bandwidth at the moment — if not no problem, sorry for the noise 🎁 — but would you maybe be able to glance and this, and the related discussion on #317 just to see if something jumps out at you about the redis usage, after the migration to redis-py? Thanks 🙏

carltongibson avatar Jul 21 '22 16:07 carltongibson

Hi all - hope you're well, figured I'd pop my head in since I had some free time and see if I could lend a hand.

This jumped out as something interesting to investigate, and I can't quite make heads or tails of it after a few minutes of poking about. But I had a feeling that it was something to do with the async timeouts package, and a quick look at their repo led me to this old issue which has repro code that looks suspiciously similar to some of our patterns: https://github.com/aio-libs/async-timeout/issues/229#issuecomment-908502523

Anyway will take a another look tomorrow when I have more time.

qeternity avatar Jul 30 '22 22:07 qeternity

@bbrowning918 can you please have a look at this branch here: https://github.com/zumalabs/channels_redis/pull/11

I'm not entirely sure what the issue is, but the test still highlighted a few improvements nonetheless.

qeternity avatar Jul 31 '22 12:07 qeternity

Hmm, locally it seems to show that both test_receive_hang run pass, but it never moves on to the next test(s); which is what I think happened on the GitHub actions on your PR where all timed out. Why is still a mystery to me.

I went down the rabbit hole from the async-timeout issue to CPython core issues and conversations around where they were seeing similar (as far as I could tell) edge cases and timing problems with cancellation/timeout. That does seem to fit what we're seeing here but again is puzzling to me.

bbrowning918 avatar Aug 09 '22 04:08 bbrowning918

I've found at least a hacky workaround:

async def _do_keepalive(self):
  while True:
    await asyncio.sleep(3)
      try:
        await self._get_sub_conn()
      except Exception:
        logger.exception("Unexpected exception in keepalive task:")

Bumping the sleep on the periodic keep alive task to anything greater than the timeouts in the tests stops any hang from occurring in my usually hang-happy local testing.

Under a tight window while we're doing our cleanup work, I believe the keep alive kicks off another _get_sub_conn and it is that call that hangs indefinitely on receive.

There is quite a large docstring on _do_keepalive which may not all hold true with redis-py or the changes to _get_sub_conn any more, so I am curious on any input with that. Would the keep alive heartbeat be better configurable as opposed to fixed?

bbrowning918 avatar Aug 24 '22 01:08 bbrowning918

Nice work @bbrowning918. 🕵️‍♀️ (Current status: not sure — Once I get the Channels update ready I will swing back here for a play. Happy if you want to make suggestions!)

carltongibson avatar Aug 25 '22 07:08 carltongibson

@acu192 — Do you have half a cycle to look at the discussion here and see what you think? (Thanks)

carltongibson avatar Sep 02 '22 07:09 carltongibson

I'll play around a bit this weekend. It seems that in the shard flush we need to take the lock to prevent the keepalive from bringing the shard back to life.

qeternity avatar Sep 02 '22 13:09 qeternity

Ok, great @qeternity.

If we can get a solution here in the next week or so that would be great, otherwise I'll push the release anyway, and we'll have to fix it later. 😜

carltongibson avatar Sep 02 '22 14:09 carltongibson

So there was quite a bit of cruft in the old aioredis logic around marshaling raw redis connections and keepalives. Using redis-py pools we get built in keepalives by using a low timeouts on the subscriber connection which will auto reconnect and resubscribe.

I've opened this quick refactor (https://github.com/django/channels_redis/pull/326) of the pubsub shard which resolves all the hangs and cleans up the code a bit. I can't find a way to implement the disconnect/reconnect notifiers under redis-py however.

qeternity avatar Sep 03 '22 01:09 qeternity

There looks to be some desirable code in redis-py that hasn't been released now, specifically pertaining to autoreconnecting in pubsub blocking mode.

The above refactor does not auto reconnect/reubscribe at scale in our test harness, so I will continue to investigate.

qeternity avatar Sep 03 '22 18:09 qeternity

Ok - this is now running pretty well in our chaos harness.

qeternity avatar Sep 04 '22 16:09 qeternity

I've rolled in #326 and pushed 4.0.0b2 to PyPI. I'd be grateful if folks could try it out — looking for final releases next week. 👍

carltongibson avatar Sep 08 '22 18:09 carltongibson