channels_redis icon indicating copy to clipboard operation
channels_redis copied to clipboard

Memory leak when using channels redis PubSub layer

Open ahmadpoorgholam opened this issue 4 years ago • 25 comments

Hello! On the production service, we are using channels and channels Redis to deliver heavy updates on the web socket clients and it has worked fine for us for the last six months. However, since the day that we switched to using the new PubSubChannelLayer, we are facing the problem of the constant infinite growth of memory consumption. and the only way around that we have found is restarting the server process on a constant interval to release the memory back. Here you could find some metadata about the system that we are running: Socket Updates are mostly generated by management commands which are running outside of the context of server processes and they call group_send to provide consumers with new updates. channels==3.0.4 channels-redis==3.3.0 the server is running under an Nginx - uvicorn==0.12.1 stack with the supervisor as the process manager. however, we have tested gunicorn and daphne as well and the memory problem stayed the same. OS: Ubuntu Server 18

ahmadpoorgholam avatar Sep 29 '21 11:09 ahmadpoorgholam

@ahmadpoorgholam Are you able to investigate to see where the leak is coming from?

carltongibson avatar Sep 30 '21 07:09 carltongibson

we do not have a clue, but the only thing that we could say for sure is that the usage of group_send over time is the main suspect for the problem. and the strange thing is we do have clean-up in disconnect ( like group discard and raise stop consumers)

ahmadpoorgholam avatar Sep 30 '21 11:09 ahmadpoorgholam

Is there any update on this? I have been having a similar issue, but unsure if they a related.

Ben-Owen-3183 avatar Oct 09 '21 11:10 Ben-Owen-3183

It needs investigation. If someone can narrow down a reproducible issue that would help.

carltongibson avatar Oct 09 '21 12:10 carltongibson

For the next time the leak shows up is there anything I should/could do to get some debug info on what is going on?

Ben-Owen-3183 avatar Oct 09 '21 12:10 Ben-Owen-3183

Hi all - something is rotten in the current pubsub layer. We maintain an internal fork but I recently sat down to merge upstream, and we're seeing lots of issues around memory consumption and sentinel connections.

Diffing our internal vs upstream doesn't yield anything obvious, so I will take another look hopefully in next few weeks...but something has definitely crept in.

qeternity avatar Feb 13 '22 19:02 qeternity

Hey @qeternity — super thanks. What's the merge base for your fork? (Presumably the issue is after that...)

carltongibson avatar Feb 14 '22 15:02 carltongibson

I set up a basic channels app to explore this a bit

my generator


async def run_async():

    i = 0
    msg = {'type': 'internal.message', 'text': 'sbf'}
    while i < 1000000:
        channel_layer = get_channel_layer()
        await channel_layer.group_send(
            'default',
            msg,
        )

I have a client that reads these (from the consumer) as fast as it can. Importantly, I let the client run for a bit, then hard control + C the client to disconnect it. I see the server detects this disconnect too

WebSocket DISCONNECT /echo/stream/ [127.0.0.1:36750]
websocket_disconnect ===
 asgispecific.0af216d849024bf086de8a873bfd5c37

Now, the generator is still pumping out tons of messages, and I notice the memory of my "python manage.py runserver" process is growing a lot. After all the messages are sent, the memory usage remains high indefinitely (does not go back down).

Note: I did not notice this growth when using channels_redis.core.RedisChannelLayer, only when using PubSub

toward the end of sending the messages, I reconnect my client. I have a pdb debugger set up on the consumer to trigger after all of the messages have sent, and I use guppy3 to print a heap

hp = guppy.hpy() hp.heap()

Partition of a set of 918491 objects. Total size = 88449575 bytes.
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
     0 626517  68 41201396  47  41201396  47 bytes
     1  99483  11 10011250  11  51212646  58 str
     2     40   0  9632448  11  60845094  69 collections.deque
     3  74337   8  5253008   6  66098102  75 tuple
     4  21609   2  3881513   4  69979615  79 types.CodeType
     5   3607   0  3717464   4  73697079  83 type
     6  22077   2  3179088   4  76876167  87 function
     7   7854   1  2496048   3  79372215  90 dict (no owner)
     8   3607   0  1737608   2  81109823  92 dict of type
     9   1019   0  1330648   2  82440471  93 dict of module

okay so 68% of that memory are bytes, let's zoom in

hp.heap()[0].byrcs

Partition of a set of 1626520 objects. Total size = 106201611 bytes.
 Index  Count   %     Size   % Cumulative  % Referrers by Kind (class / dict of class)
     0 1589575  98 103322357  97 103322357  97 collections.deque
     1  34735   2  2790813   3 106113170 100 types.CodeType
     2   1368   0    51518   0 106164688 100 dict (no owner)
     3    653   0    26241   0 106190929 100 tuple
     4    104   0     5914   0 106196843 100 dict of module
     5     14   0      814   0 106197657 100 re.Pattern, tuple
     6      9   0      770   0 106198427 100 dict of type
     7     15   0      698   0 106199125 100 list
     8      6   0      443   0 106199568 100 aioredis.util.coerced_keys_dict
     9      6   0      443   0 106200011 100 dict of aioredis.pubsub._Sender, tuple

all of that data is in collections.deque objects

Hopefully this sheds some light on this issue

fosterseth avatar Apr 02 '22 20:04 fosterseth

another thing to note -- when I control + C my client, I see this traceback logged in the runserver terminal

WebSocket DISCONNECT /echo/stream/ [127.0.0.1:45544]
Exception inside application: Attempt to send on a closed protocol
Traceback (most recent call last):
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/staticfiles.py", line 44, in __call__
    return await self.application(scope, receive, send)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/routing.py", line 71, in __call__
    return await application(scope, receive, send)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/sessions.py", line 47, in __call__
    return await self.inner(dict(scope, cookies=cookies), receive, send)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/sessions.py", line 263, in __call__
    return await self.inner(wrapper.scope, receive, wrapper.send)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/auth.py", line 185, in __call__
    return await super().__call__(scope, receive, send)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/middleware.py", line 26, in __call__
    return await self.inner(scope, receive, send)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/routing.py", line 150, in __call__
    return await application(
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/consumer.py", line 94, in app
    return await consumer(scope, receive, send)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/consumer.py", line 58, in __call__
    await await_many_dispatch(
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/utils.py", line 51, in await_many_dispatch
    await dispatch(result)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/consumer.py", line 73, in dispatch
    await handler(message)
  File "/home/sbf/channels_app/proj/app/consumers.py", line 20, in internal_message
    await self.send(text_data=event["text"])
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/generic/websocket.py", line 209, in send
    await super().send({"type": "websocket.send", "text": text_data})
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/consumer.py", line 81, in send
    await self.base_send(message)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/sessions.py", line 226, in send
    return await self.real_send(message)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/daphne/server.py", line 234, in handle_reply
    protocol.handle_reply(message)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/daphne/ws_protocol.py", line 202, in handle_reply
    self.serverSend(message["text"], False)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/daphne/ws_protocol.py", line 256, in serverSend
    self.sendMessage(content.encode("utf8"), binary)
  File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/autobahn/websocket/protocol.py", line 2254, in sendMessage
    raise Disconnected("Attempt to send on a closed protocol")
autobahn.exception.Disconnected: Attempt to send on a closed protocol

To get around this traceback I added some try / except in my consumers.py

async def internal_message(self, event):
    try:
        await self.send(text_data=event["text"])
    except Exception:
        await self.websocket_disconnect(None)

async def websocket_disconnect(self, message):
    await super().websocket_disconnect(message)

Now when my client disconnects, I no longer see memory growth, which is good. However, I expected the memory to go down (i.e. all of those buffered messages should be released), but it does not. I don't believe that queue is being cleaned up.

If there are messages in the channel Queue, and the consumer disconnects (I control + C my client), I am not hitting https://github.com/django/channels_redis/blob/bba93196d8fe5e5fbfc470350c1f3da168c56739/channels_redis/pubsub.py#L191

But if there are no messages in the channel Queue, and the consumer disconnects, then I do hit that del self.channels[channel]

I can reproduce this reliably each time.

fosterseth avatar Apr 04 '22 02:04 fosterseth

However, I expected the memory to go down (i.e. all of those buffered messages should be released), but it does not. I don't believe that queue is being cleaned up.

@fosterseth given CPython's memory allocator, is this really to be expected?

There is a memory leak somewhere, unfortunately we end up cycling k8s containers often enough that it's not much of an issue and haven't investigated further.

qeternity avatar Apr 04 '22 16:04 qeternity

@fosterseth Thanks for investigating this. Your discoveries got me thinking, and now I have some ideas on what is going wrong here.

First, some context:

The giant comment I wrote here is because django/channels calls new_channel() (here), but it doesn't offer an obvious way "clean up" that new channel... thus the only way I could figure out how to clean up (i.e. call del self.channels[channel]) was to wait for a CancelledError in receive() (here) and use that opportunity to clean stuff up.

So, what's going wrong:

@fosterseth I think you've found an execution path where CancelledError is never thrown into receive(), thus the PubSub channel layer never gets to clean up. Looking at how that might happen... perhaps if an exception is thrown here, then the channel layer's receive() is never rescheduled, thus is never canceled. But... whatever the reason... I think we cannot solve this without changing django/channels, which is not too surprising and was the reason I wrote that original big comment. Basically, I believe we're doing the best we can in PubSub to clean up given that django/channels doesn't give us a chance to clean up.

How to fix:

I think we should add code (perhaps here) ... something like:

if self.channel_layer is not None and hasattr(self.channel_layer, 'clean_channel'):
    self.channel_layer.clean_channel(self.channel_name)

That is, clean_channel() will cleanup anything done by new_channel() ... thus an opportunity to clean up that is guaranteed to be called.

Then of course we'd change the PubSub impl to clean up in clean_channel() instead of how it does it now.

Memory going down...

Memory management at the OS-level is whack enough (i.e. calling free() in most real-world applications doesn't cause a drop in memory consumption reported by the OS, due to fragmentation). Then you add cpython's memory manager / garbage collector on top of that... and you just cannot expect memory to go down even in times you might expect it. (similar to what @qeternity is saying as well)

My use case...

Again, like @qeternity's case, in our case we cycle in/out containers often enough (usually because we're deploying new versions of our app) that this leak isn't hurting us in production. That said, it would certainly be great to fix it.

acu192 avatar Apr 04 '22 17:04 acu192

Can always count on @acu192 for the real insights. Indeed I think fixing memory leaks is the best we can hope for. OS shenanigans aside, CPython rarely returns memory to the OS anyway, especially for small objects.

qeternity avatar Apr 04 '22 17:04 qeternity

We're also seeing the memory leak and it's pretty aggressive, but not universal. We seem to be leaking 10-20 MB / hour.

Screen Shot 2022-05-24 at 17 21 55

jalaziz avatar May 25 '22 00:05 jalaziz

...given that django/channels doesn't give us a chance to clean up.

@acu192 Would you like to investigate making that change in django/channels?

carltongibson avatar May 25 '22 06:05 carltongibson

Just on the Is it a leak? question. Finding a place to run a gc.collect() will show if the stale objects are collectable or not. Quite often in these cases they are, but Python's not very aggressive in garbage collecting, at least by default. (It gets into dark arts pretty quickly after that tuning the Python GC but...)

carltongibson avatar May 25 '22 06:05 carltongibson

@acu192 Would you like to investigate making that change in django/channels?

Yes, I'll put some time on my calendar in the next few weeks to create a PR to django/channels.

acu192 avatar May 25 '22 14:05 acu192

Any progress here, maybe?

pbylina avatar Sep 29 '22 09:09 pbylina

bump

Sharpek avatar Nov 04 '22 10:11 Sharpek

@Sharpek @pbylina how about you guys spend an ounce of energy on this, instead of posting unhelpful comments.

qeternity avatar Nov 04 '22 16:11 qeternity

put up a PR ^

hopefully someone can help test this out

pip install git+https://github.com/fosterseth/channels_redis.git@clean_channels
pip install git+https://github.com/fosterseth/channels.git@clean_channels

fosterseth avatar Nov 05 '22 01:11 fosterseth

hi @acu192 ! any thoughts on those PRs? I don't have permissions to kick off workflow or add reviewers. Thank you

fosterseth avatar Nov 11 '22 20:11 fosterseth

@fosterseth I'm having a quick look now, just in case the far more capable @acu192 is busy at the moment

qeternity avatar Nov 11 '22 20:11 qeternity

Ha! I just saw that @acu192 is already on it about 20 min ago!

qeternity avatar Nov 11 '22 20:11 qeternity

This article might help some of you:

https://www.paradigm.co/blog/anatomy-of-a-python-memory-leak

pfcodes avatar Jun 15 '23 05:06 pfcodes