channels_redis
channels_redis copied to clipboard
Memory leak when using channels redis PubSub layer
Hello! On the production service, we are using channels and channels Redis to deliver heavy updates on the web socket clients and it has worked fine for us for the last six months.
However, since the day that we switched to using the new PubSubChannelLayer, we are facing the problem of the constant infinite growth of memory consumption. and the only way around that we have found is restarting the server process on a constant interval to release the memory back.
Here you could find some metadata about the system that we are running:
Socket Updates are mostly generated by management commands which are running outside of the context of server processes and they call group_send to provide consumers with new updates.
channels==3.0.4
channels-redis==3.3.0
the server is running under an Nginx - uvicorn==0.12.1 stack with the supervisor as the process manager. however, we have tested gunicorn and daphne as well and the memory problem stayed the same.
OS: Ubuntu Server 18
@ahmadpoorgholam Are you able to investigate to see where the leak is coming from?
we do not have a clue, but the only thing that we could say for sure is that the usage of group_send over time is the main suspect for the problem. and the strange thing is we do have clean-up in disconnect ( like group discard and raise stop consumers)
Is there any update on this? I have been having a similar issue, but unsure if they a related.
It needs investigation. If someone can narrow down a reproducible issue that would help.
For the next time the leak shows up is there anything I should/could do to get some debug info on what is going on?
Hi all - something is rotten in the current pubsub layer. We maintain an internal fork but I recently sat down to merge upstream, and we're seeing lots of issues around memory consumption and sentinel connections.
Diffing our internal vs upstream doesn't yield anything obvious, so I will take another look hopefully in next few weeks...but something has definitely crept in.
Hey @qeternity — super thanks. What's the merge base for your fork? (Presumably the issue is after that...)
I set up a basic channels app to explore this a bit
my generator
async def run_async():
i = 0
msg = {'type': 'internal.message', 'text': 'sbf'}
while i < 1000000:
channel_layer = get_channel_layer()
await channel_layer.group_send(
'default',
msg,
)
I have a client that reads these (from the consumer) as fast as it can. Importantly, I let the client run for a bit, then hard control + C the client to disconnect it. I see the server detects this disconnect too
WebSocket DISCONNECT /echo/stream/ [127.0.0.1:36750]
websocket_disconnect ===
asgispecific.0af216d849024bf086de8a873bfd5c37
Now, the generator is still pumping out tons of messages, and I notice the memory of my "python manage.py runserver" process is growing a lot. After all the messages are sent, the memory usage remains high indefinitely (does not go back down).
Note: I did not notice this growth when using channels_redis.core.RedisChannelLayer, only when using PubSub
toward the end of sending the messages, I reconnect my client. I have a pdb debugger set up on the consumer to trigger after all of the messages have sent, and I use guppy3 to print a heap
hp = guppy.hpy() hp.heap()
Partition of a set of 918491 objects. Total size = 88449575 bytes.
Index Count % Size % Cumulative % Kind (class / dict of class)
0 626517 68 41201396 47 41201396 47 bytes
1 99483 11 10011250 11 51212646 58 str
2 40 0 9632448 11 60845094 69 collections.deque
3 74337 8 5253008 6 66098102 75 tuple
4 21609 2 3881513 4 69979615 79 types.CodeType
5 3607 0 3717464 4 73697079 83 type
6 22077 2 3179088 4 76876167 87 function
7 7854 1 2496048 3 79372215 90 dict (no owner)
8 3607 0 1737608 2 81109823 92 dict of type
9 1019 0 1330648 2 82440471 93 dict of module
okay so 68% of that memory are bytes, let's zoom in
hp.heap()[0].byrcs
Partition of a set of 1626520 objects. Total size = 106201611 bytes.
Index Count % Size % Cumulative % Referrers by Kind (class / dict of class)
0 1589575 98 103322357 97 103322357 97 collections.deque
1 34735 2 2790813 3 106113170 100 types.CodeType
2 1368 0 51518 0 106164688 100 dict (no owner)
3 653 0 26241 0 106190929 100 tuple
4 104 0 5914 0 106196843 100 dict of module
5 14 0 814 0 106197657 100 re.Pattern, tuple
6 9 0 770 0 106198427 100 dict of type
7 15 0 698 0 106199125 100 list
8 6 0 443 0 106199568 100 aioredis.util.coerced_keys_dict
9 6 0 443 0 106200011 100 dict of aioredis.pubsub._Sender, tuple
all of that data is in collections.deque objects
Hopefully this sheds some light on this issue
another thing to note -- when I control + C my client, I see this traceback logged in the runserver terminal
WebSocket DISCONNECT /echo/stream/ [127.0.0.1:45544]
Exception inside application: Attempt to send on a closed protocol
Traceback (most recent call last):
File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/staticfiles.py", line 44, in __call__
return await self.application(scope, receive, send)
File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/routing.py", line 71, in __call__
return await application(scope, receive, send)
File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/sessions.py", line 47, in __call__
return await self.inner(dict(scope, cookies=cookies), receive, send)
File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/sessions.py", line 263, in __call__
return await self.inner(wrapper.scope, receive, wrapper.send)
File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/auth.py", line 185, in __call__
return await super().__call__(scope, receive, send)
File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/middleware.py", line 26, in __call__
return await self.inner(scope, receive, send)
File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/routing.py", line 150, in __call__
return await application(
File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/consumer.py", line 94, in app
return await consumer(scope, receive, send)
File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/consumer.py", line 58, in __call__
await await_many_dispatch(
File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/utils.py", line 51, in await_many_dispatch
await dispatch(result)
File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/consumer.py", line 73, in dispatch
await handler(message)
File "/home/sbf/channels_app/proj/app/consumers.py", line 20, in internal_message
await self.send(text_data=event["text"])
File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/generic/websocket.py", line 209, in send
await super().send({"type": "websocket.send", "text": text_data})
File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/consumer.py", line 81, in send
await self.base_send(message)
File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/channels/sessions.py", line 226, in send
return await self.real_send(message)
File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/daphne/server.py", line 234, in handle_reply
protocol.handle_reply(message)
File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/daphne/ws_protocol.py", line 202, in handle_reply
self.serverSend(message["text"], False)
File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/daphne/ws_protocol.py", line 256, in serverSend
self.sendMessage(content.encode("utf8"), binary)
File "/home/sbf/channels_app/proj/env/lib/python3.10/site-packages/autobahn/websocket/protocol.py", line 2254, in sendMessage
raise Disconnected("Attempt to send on a closed protocol")
autobahn.exception.Disconnected: Attempt to send on a closed protocol
To get around this traceback I added some try / except in my consumers.py
async def internal_message(self, event):
try:
await self.send(text_data=event["text"])
except Exception:
await self.websocket_disconnect(None)
async def websocket_disconnect(self, message):
await super().websocket_disconnect(message)
Now when my client disconnects, I no longer see memory growth, which is good. However, I expected the memory to go down (i.e. all of those buffered messages should be released), but it does not. I don't believe that queue is being cleaned up.
If there are messages in the channel Queue, and the consumer disconnects (I control + C my client), I am not hitting https://github.com/django/channels_redis/blob/bba93196d8fe5e5fbfc470350c1f3da168c56739/channels_redis/pubsub.py#L191
But if there are no messages in the channel Queue, and the consumer disconnects, then I do hit that del self.channels[channel]
I can reproduce this reliably each time.
However, I expected the memory to go down (i.e. all of those buffered messages should be released), but it does not. I don't believe that queue is being cleaned up.
@fosterseth given CPython's memory allocator, is this really to be expected?
There is a memory leak somewhere, unfortunately we end up cycling k8s containers often enough that it's not much of an issue and haven't investigated further.
@fosterseth Thanks for investigating this. Your discoveries got me thinking, and now I have some ideas on what is going wrong here.
First, some context:
The giant comment I wrote here is because django/channels calls new_channel() (here), but it doesn't offer an obvious way "clean up" that new channel... thus the only way I could figure out how to clean up (i.e. call del self.channels[channel]) was to wait for a CancelledError in receive() (here) and use that opportunity to clean stuff up.
So, what's going wrong:
@fosterseth I think you've found an execution path where CancelledError is never thrown into receive(), thus the PubSub channel layer never gets to clean up. Looking at how that might happen... perhaps if an exception is thrown here, then the channel layer's receive() is never rescheduled, thus is never canceled. But... whatever the reason... I think we cannot solve this without changing django/channels, which is not too surprising and was the reason I wrote that original big comment. Basically, I believe we're doing the best we can in PubSub to clean up given that django/channels doesn't give us a chance to clean up.
How to fix:
I think we should add code (perhaps here) ... something like:
if self.channel_layer is not None and hasattr(self.channel_layer, 'clean_channel'):
self.channel_layer.clean_channel(self.channel_name)
That is, clean_channel() will cleanup anything done by new_channel() ... thus an opportunity to clean up that is guaranteed to be called.
Then of course we'd change the PubSub impl to clean up in clean_channel() instead of how it does it now.
Memory going down...
Memory management at the OS-level is whack enough (i.e. calling free() in most real-world applications doesn't cause a drop in memory consumption reported by the OS, due to fragmentation). Then you add cpython's memory manager / garbage collector on top of that... and you just cannot expect memory to go down even in times you might expect it. (similar to what @qeternity is saying as well)
My use case...
Again, like @qeternity's case, in our case we cycle in/out containers often enough (usually because we're deploying new versions of our app) that this leak isn't hurting us in production. That said, it would certainly be great to fix it.
Can always count on @acu192 for the real insights. Indeed I think fixing memory leaks is the best we can hope for. OS shenanigans aside, CPython rarely returns memory to the OS anyway, especially for small objects.
We're also seeing the memory leak and it's pretty aggressive, but not universal. We seem to be leaking 10-20 MB / hour.
...given that django/channels doesn't give us a chance to clean up.
@acu192 Would you like to investigate making that change in django/channels?
Just on the Is it a leak? question. Finding a place to run a gc.collect() will show if the stale objects are collectable or not. Quite often in these cases they are, but Python's not very aggressive in garbage collecting, at least by default. (It gets into dark arts pretty quickly after that tuning the Python GC but...)
@acu192 Would you like to investigate making that change in django/channels?
Yes, I'll put some time on my calendar in the next few weeks to create a PR to django/channels.
Any progress here, maybe?
bump
@Sharpek @pbylina how about you guys spend an ounce of energy on this, instead of posting unhelpful comments.
put up a PR ^
hopefully someone can help test this out
pip install git+https://github.com/fosterseth/channels_redis.git@clean_channels
pip install git+https://github.com/fosterseth/channels.git@clean_channels
hi @acu192 ! any thoughts on those PRs? I don't have permissions to kick off workflow or add reviewers. Thank you
@fosterseth I'm having a quick look now, just in case the far more capable @acu192 is busy at the moment
Ha! I just saw that @acu192 is already on it about 20 min ago!
This article might help some of you:
https://www.paradigm.co/blog/anatomy-of-a-python-memory-leak