channels icon indicating copy to clipboard operation
channels copied to clipboard

Memory leak if writing to a channel that is never read from later

Open davidfstr opened this issue 4 years ago • 21 comments

It appears that if you write a message to a channel, for example via group_send, and no reader ever appears on that channel, the messages will remain in the in-memory queue channels.layers.channel_layers.backends['default'].receive_buffer indefinitely when using the RedisChannelLayer backend. In particular I have captured a server that has over 100k items in that dictionary.

One way to avoid this problem would be to extend the API for group_send with a time-to-live parameter so that messages would expire over time if they weren't read. Thoughts?

My pip freeze, in case it's useful:

channels==2.1.2
channels-redis==2.2.1

davidfstr avatar Oct 02 '19 03:10 davidfstr

Thoughts?

Right, yes. 🙂

  • First off I guess, we should make clearing out stale items easier.
  • Then allowing a default TTL.
  • Then allowing that per-message.

carltongibson avatar Oct 02 '19 08:10 carltongibson

I have discovered that RedisChannelLayer already has a concept of a "group expiry", which we may want to hook into for the "set a default TTL" scenario:

class RedisChannelLayer(BaseChannelLayer):
    def __init__(..., group_expiry=86400, ...):
        self.group_expiry = group_expiry  # channels_redis.core:46

This is alluded to briefly in Channels documentation:

Groups are a broadcast system that:

  • ...
  • Provides group expiry for clean-up of connections whose disconnect handler didn’t get to run (e.g. power failure)

I was unable to find documentation for changing the group_expiry of RedisChannelLayer, but reading the source for channel/layers.py:58@_make_backend suggests that I could put in settings.py something like:

    CHANNEL_LAYERS = {
        'default': {
            'BACKEND': 'channels_redis.core.RedisChannelLayer',
            'CONFIG': {
                'hosts': ...,
                'prefix': ...,
                'group_expiry': 86400,  # seconds?; 24 hours
            },
        },
    }

Do you think it would be appropriate to hook into the existing "group_expiry" concept, or do you think a separate "message_expiry" (or similar) would be more appropriate?

davidfstr avatar Oct 02 '19 13:10 davidfstr

No, let's use what's there.

I was unable to find documentation ...

Ultimately I think this is the issue: I end up having to look at the channels_redis source to work out what the options are. So first step would be a PR explaining what's already possible.

(Maybe that'd be enough)

carltongibson avatar Oct 02 '19 13:10 carltongibson

Agreed that the documentation should be extended to show how to configure the built-in backend types.

Unfortunately group_expiry doesn't actually control expiration of messages in the receive_buffer. So there still needs to be some logic added to perform expiration.

Fix 1 Sketch

  • First off I guess, we should make clearing out stale items easier.

I propose that receive_buffer keep track of the last time a message was read from it, on a per-channel basis. And that an API (expire_unread_channels) is provided on RedisChannelLayer that can clear any channels that were not read within the last N seconds. Example usage:

channel_layer = channels.layers.get_channel_layer()
assert isinstance(channel_layer, RedisChannelLayer)
channel_layer.expire_unread_channels(ttl=300)  # seconds; = 5 minutes; defaults to 5 minutes

Fix 2 Sketch

  • Then allowing a default TTL.

Extend Fix 1 above so that RedisChannelLayer spawns a thread that calls expire_unread_channels() on itself every M seconds, for some M (perhaps group_expiry/2?). This thread would use the configured group_expiry (mentioned earlier) as the default TTL to pass to expire_unread_channels().

davidfstr avatar Oct 02 '19 14:10 davidfstr

Prior art: Sessions has a similar problem with clearing expired session records. It provides the following API:

clear_expired()

  • Removes expired sessions from the session store. This class method is called by clearsessions.

We could spell expire_unread_channels() instead as clear_expired() for consistency, although I like the original name a bit better. :)

davidfstr avatar Oct 02 '19 14:10 davidfstr

Yes, good. I like the build up.

If you're keen here, can I ask you to look at https://github.com/django/channels_redis/pull/165 first? It looks good at first sight, but I haven't had the bandwidth to sit down with it properly, so another pair of eyes there would be super handy!

carltongibson avatar Oct 02 '19 14:10 carltongibson

@carltongibson Is django/channels_redis#165 potentially related to this bug?

Reviewing that would require me to learn in detail how the current and proposed receiving system works, which take a block of time (2hrs) that is difficult for me to schedule in a timely fashion.

davidfstr avatar Oct 07 '19 19:10 davidfstr

Hey @davidfstr.

...which take a block of time...

Yep. Open source. :) (This is the exact hold-up there... BANDWIDTH). No stress, no rush — if you don't have capacity to look at it, no problem! But... if you can slowly have it on the back burner, that would be a great help. (As I say, no stress about it!!!)

Is it related? To the extent that it's on the short list of improvements to be made here, and we're looking at touching the receive buffer, yes.

But... have a glance at it and see what you think. I'm happy to accept your input as it's available, so you tell me. 🙂

carltongibson avatar Oct 08 '19 08:10 carltongibson

Got it. Touches the receive buffer. So we'd have to reconcile these changes eventually anyway.

I'll keep the review in my queue. Projections at work have me looking at this again in 3 weeks, but I may get lucky and find some time earlier.

davidfstr avatar Oct 08 '19 15:10 davidfstr

OK, thanks @davidfstr! (As I say, no stress! It's OSS 🙂)

carltongibson avatar Oct 08 '19 15:10 carltongibson

Hello all! What's the status here? And what's the (average?) severity of it not being fixed (if that's even a reasonable question)?

All of the interactions into channels redis on my project happen on a celery worker so worst case the worker fails (as opposed to ballooning RAM on an API machine), but again, rather find a way around that!

jheld avatar Feb 26 '20 02:02 jheld

Status: Bug doesn't hurt enough that anyone has put in the effort to fix it yet.

To workaround this (and other kinds of slowish memory leaks), I've configured most of my web services to restart after serving (N + random jitter) requests. Gunicorn in particular has a config option for this.

davidfstr avatar Feb 26 '20 02:02 davidfstr

@jheld You can use https://github.com/CJWorkbench/channels_rabbitmq (disclaimer: I'm the author) instead of channels_redis. It uses message TTLs, limits buffer sizes, warns on buffer overflow and recovers gracefully.

@davidfstr this bug did hurt enough that we put in the effort to fix it. The fix is channels_rabbitmq.

adamhooper avatar Jun 16 '20 21:06 adamhooper

@davidfstr @carltongibson

A version of this bug that I've also seen - you don't just see this if the channel is never read from later. You can also see if it the channel is read from too slowly. In the default channels_redis implementation, per-channel asyncio.Queue objects grow in an unbounded way; if they're not read at the same rate as insertion on the other end, Daphne will continue to just grow in memory consumption forever.

I'd argue that these per-channel Queue objects should probably be bound in size. There's already a capacity argument; maybe the per-channel buffer should respect that, and only buffer up to that many objects before dropping old ones?

https://github.com/django/channels_redis#capacity

I do think a goal of passive cleanup makes sense, but I think a reasonable upper bound on queue size would likely prevent many people from getting into bad situations in the first place.

ryanpetrello avatar Aug 12 '20 15:08 ryanpetrello

Hi @ryanpetrello -- Thanks for your work looking at this. Your points all seem reasonable.

If you want to block out proof-of-concept sketches, really happy to discuss those.

I'm working on the v3 update at the moment, and won't have capacity to cycle back to this immediately, so all help very much appreciated!

carltongibson avatar Aug 13 '20 05:08 carltongibson

I think this issue is on the wrong project. It should be on channels_redis.

channels_rabbitmq doesn't have this bug. It has local_capacity and local_expiry to solve this problem.

I have helpful (but abrasive!) comments about the Redis layer's architecture -- and Channels' architecture in general. If you're dismayed by channels_redis behavior, here's a more complete picture:

The Channels layer spec -- and, heck, Channels' main loop -- has a big flaw: clients poll for messages instead of subscribing to messages. By definition, a client calls receive() on a channel that may or may not exist. In the microseconds after a client calls receive() and before it calls its next receive(), there is no client waiting on the channel and there may be no pending messages in the channel. According to the spec, the channel doesn't exist.

That leads to a corollary on the delivery side. When a Channels layer is delivering a message, it must deliver to non-existent channels!

channels_rabbitmq handles this with process-local expiry. channels_redis doesn't. (The channels_redis folks are welcome to copy channels_rabbitmq logic here.)

adamhooper avatar Aug 13 '20 14:08 adamhooper

Hey @adamhooper — We like the comments 😀

Yes, I know it should be on channels_redis, but I don't want to close the issues until I've got the bandwidth to resolve them.

...clients poll for messages instead of subscribing to messages.

Really happy to look at sketches of a reworking there.

carltongibson avatar Aug 13 '20 15:08 carltongibson

...clients poll for messages instead of subscribing to messages.

Really happy to look at sketches of a reworking there.

channels.consumer.AsyncConsumer.__call__:

    async def __call__(self, receive, send):
        """
        Dispatches incoming messages to type-based handlers asynchronously.
        """
        with contextlib.AsyncExitStack() as stack:
            # Initialize channel layer
            self.channel_layer = get_channel_layer(self.channel_layer_alias)
            if self.channel_layer is not None:
                channel = await stack.enter_async_context(self.channel_layer.new_channel_v2())
                self.channel_name = channel.name
            # Store send function
            if self._sync:
                self.base_send = async_to_sync(send)
            else:
                self.base_send = send
            # Pass messages in from channel layer or client to dispatch method
            try:
                if self.channel_layer is not None:
                    await await_many_dispatch(
                        [receive, channel], self.dispatch
                    )
                else:
                    await await_many_dispatch([receive], self.dispatch)
            except StopConsumer:
                # Exit cleanly
                pass

A channel object is an async iterator: __anext()__ returns a message, and __aclose()__ stops the world.

I think it would be easier to write against this API. I don't have time to actually submit and test a pull request, though :).

adamhooper avatar Aug 13 '20 16:08 adamhooper