channels
channels copied to clipboard
Memory leak if writing to a channel that is never read from later
It appears that if you write a message to a channel, for example via group_send
, and no reader ever appears on that channel, the messages will remain in the in-memory queue channels.layers.channel_layers.backends['default'].receive_buffer
indefinitely when using the RedisChannelLayer
backend. In particular I have captured a server that has over 100k items in that dictionary.
One way to avoid this problem would be to extend the API for group_send
with a time-to-live parameter so that messages would expire over time if they weren't read. Thoughts?
My pip freeze
, in case it's useful:
channels==2.1.2
channels-redis==2.2.1
Thoughts?
Right, yes. 🙂
- First off I guess, we should make clearing out stale items easier.
- Then allowing a default TTL.
- Then allowing that per-message.
I have discovered that RedisChannelLayer already has a concept of a "group expiry", which we may want to hook into for the "set a default TTL" scenario:
class RedisChannelLayer(BaseChannelLayer):
def __init__(..., group_expiry=86400, ...):
self.group_expiry = group_expiry # channels_redis.core:46
This is alluded to briefly in Channels documentation:
Groups are a broadcast system that:
- ...
- Provides group expiry for clean-up of connections whose disconnect handler didn’t get to run (e.g. power failure)
I was unable to find documentation for changing the group_expiry
of RedisChannelLayer
, but reading the source for channel/layers.py:58@_make_backend
suggests that I could put in settings.py something like:
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
'hosts': ...,
'prefix': ...,
'group_expiry': 86400, # seconds?; 24 hours
},
},
}
Do you think it would be appropriate to hook into the existing "group_expiry" concept, or do you think a separate "message_expiry" (or similar) would be more appropriate?
No, let's use what's there.
I was unable to find documentation ...
Ultimately I think this is the issue: I end up having to look at the channels_redis source to work out what the options are. So first step would be a PR explaining what's already possible.
(Maybe that'd be enough)
Agreed that the documentation should be extended to show how to configure the built-in backend types.
Unfortunately group_expiry
doesn't actually control expiration of messages in the receive_buffer
. So there still needs to be some logic added to perform expiration.
Fix 1 Sketch
- First off I guess, we should make clearing out stale items easier.
I propose that receive_buffer
keep track of the last time a message was read from it, on a per-channel basis. And that an API (expire_unread_channels
) is provided on RedisChannelLayer that can clear any channels that were not read within the last N seconds. Example usage:
channel_layer = channels.layers.get_channel_layer()
assert isinstance(channel_layer, RedisChannelLayer)
channel_layer.expire_unread_channels(ttl=300) # seconds; = 5 minutes; defaults to 5 minutes
Fix 2 Sketch
- Then allowing a default TTL.
Extend Fix 1 above so that RedisChannelLayer spawns a thread that calls expire_unread_channels() on itself every M seconds, for some M (perhaps group_expiry/2
?). This thread would use the configured group_expiry
(mentioned earlier) as the default TTL to pass to expire_unread_channels().
Prior art: Sessions has a similar problem with clearing expired session records. It provides the following API:
clear_expired()
- Removes expired sessions from the session store. This class method is called by clearsessions.
We could spell expire_unread_channels() instead as clear_expired() for consistency, although I like the original name a bit better. :)
Yes, good. I like the build up.
If you're keen here, can I ask you to look at https://github.com/django/channels_redis/pull/165 first? It looks good at first sight, but I haven't had the bandwidth to sit down with it properly, so another pair of eyes there would be super handy!
@carltongibson Is django/channels_redis#165 potentially related to this bug?
Reviewing that would require me to learn in detail how the current and proposed receiving system works, which take a block of time (2hrs) that is difficult for me to schedule in a timely fashion.
Hey @davidfstr.
...which take a block of time...
Yep. Open source. :) (This is the exact hold-up there... BANDWIDTH). No stress, no rush — if you don't have capacity to look at it, no problem! But... if you can slowly have it on the back burner, that would be a great help. (As I say, no stress about it!!!)
Is it related? To the extent that it's on the short list of improvements to be made here, and we're looking at touching the receive buffer, yes.
But... have a glance at it and see what you think. I'm happy to accept your input as it's available, so you tell me. 🙂
Got it. Touches the receive buffer. So we'd have to reconcile these changes eventually anyway.
I'll keep the review in my queue. Projections at work have me looking at this again in 3 weeks, but I may get lucky and find some time earlier.
OK, thanks @davidfstr! (As I say, no stress! It's OSS 🙂)
Hello all! What's the status here? And what's the (average?) severity of it not being fixed (if that's even a reasonable question)?
All of the interactions into channels redis on my project happen on a celery worker so worst case the worker fails (as opposed to ballooning RAM on an API machine), but again, rather find a way around that!
Status: Bug doesn't hurt enough that anyone has put in the effort to fix it yet.
To workaround this (and other kinds of slowish memory leaks), I've configured most of my web services to restart after serving (N + random jitter) requests. Gunicorn in particular has a config option for this.
@jheld You can use https://github.com/CJWorkbench/channels_rabbitmq (disclaimer: I'm the author) instead of channels_redis. It uses message TTLs, limits buffer sizes, warns on buffer overflow and recovers gracefully.
@davidfstr this bug did hurt enough that we put in the effort to fix it. The fix is channels_rabbitmq
.
@davidfstr @carltongibson
A version of this bug that I've also seen - you don't just see this if the channel is never read from later. You can also see if it the channel is read from too slowly. In the default channels_redis
implementation, per-channel asyncio.Queue
objects grow in an unbounded way; if they're not read at the same rate as insertion on the other end, Daphne will continue to just grow in memory consumption forever.
I'd argue that these per-channel Queue
objects should probably be bound in size. There's already a capacity
argument; maybe the per-channel buffer should respect that, and only buffer up to that many objects before dropping old ones?
https://github.com/django/channels_redis#capacity
I do think a goal of passive cleanup makes sense, but I think a reasonable upper bound on queue size would likely prevent many people from getting into bad situations in the first place.
Hi @ryanpetrello -- Thanks for your work looking at this. Your points all seem reasonable.
If you want to block out proof-of-concept sketches, really happy to discuss those.
I'm working on the v3 update at the moment, and won't have capacity to cycle back to this immediately, so all help very much appreciated!
I think this issue is on the wrong project. It should be on channels_redis
.
channels_rabbitmq doesn't have this bug. It has local_capacity and local_expiry to solve this problem.
I have helpful (but abrasive!) comments about the Redis layer's architecture -- and Channels' architecture in general. If you're dismayed by channels_redis
behavior, here's a more complete picture:
The Channels layer spec -- and, heck, Channels' main loop -- has a big flaw: clients poll for messages instead of subscribing to messages. By definition, a client calls receive()
on a channel that may or may not exist. In the microseconds after a client calls receive()
and before it calls its next receive()
, there is no client waiting on the channel and there may be no pending messages in the channel. According to the spec, the channel doesn't exist.
That leads to a corollary on the delivery side. When a Channels layer is delivering a message, it must deliver to non-existent channels!
channels_rabbitmq
handles this with process-local expiry. channels_redis
doesn't. (The channels_redis
folks are welcome to copy channels_rabbitmq
logic here.)
Hey @adamhooper — We like the comments 😀
Yes, I know it should be on channels_redis, but I don't want to close the issues until I've got the bandwidth to resolve them.
...clients poll for messages instead of subscribing to messages.
Really happy to look at sketches of a reworking there.
...clients poll for messages instead of subscribing to messages.
Really happy to look at sketches of a reworking there.
channels.consumer.AsyncConsumer.__call__
:
async def __call__(self, receive, send):
"""
Dispatches incoming messages to type-based handlers asynchronously.
"""
with contextlib.AsyncExitStack() as stack:
# Initialize channel layer
self.channel_layer = get_channel_layer(self.channel_layer_alias)
if self.channel_layer is not None:
channel = await stack.enter_async_context(self.channel_layer.new_channel_v2())
self.channel_name = channel.name
# Store send function
if self._sync:
self.base_send = async_to_sync(send)
else:
self.base_send = send
# Pass messages in from channel layer or client to dispatch method
try:
if self.channel_layer is not None:
await await_many_dispatch(
[receive, channel], self.dispatch
)
else:
await await_many_dispatch([receive], self.dispatch)
except StopConsumer:
# Exit cleanly
pass
A channel
object is an async iterator: __anext()__
returns a message, and __aclose()__
stops the world.
I think it would be easier to write against this API. I don't have time to actually submit and test a pull request, though :).