channels icon indicating copy to clipboard operation
channels copied to clipboard

(Re)Implement Multiplexing

Open andrewgodwin opened this issue 7 years ago • 24 comments

Channels 2 needs multiplexing support, as it's not yet implemented. It'll need a substantially different design to the Channels 1 one, but it should be easier to make it nest nicely.

andrewgodwin avatar Jan 20 '18 06:01 andrewgodwin

I am trying to visualize how this should be implemented.

The problem I keep encountering, however, is that the Demultiplexer needs to be the class that terminates the WebSocket connection. As we don't know where to dispatch messages to until we receive them, and every message for a given connection could have a different destination.

So having the Demultiplexer dispatch to a set of Consumer classes seems wrong, because the Consumer classes each assume that they exclusively terminate the websocket connection. They also contain functionality that is not needed, such as reading/writing from the websocket.

So maybe we need a completely independent consumer class (probably sync and async) dedicated to just receiving Demultiplexer JSON messages?

brianmay avatar Feb 07 '18 01:02 brianmay

Yes, this is going to need an ASGI application (probably a consumer but might have to be lower-level) that does the de/re-multiplexing itself and then manages a suite of other application instances, one for each stream. It's going to be a lot more complex than the middlewares/routing that just looks at scope.

andrewgodwin avatar Feb 07 '18 02:02 andrewgodwin

You think it needs to be this complex?

Why not do as @brianmay suggested and terminate the WebSocket on the Demultiplexer and make the child Consumerish classes not be true Consumers but rather be some DemultiplexedConsumer the Demultiplexer could keep references to them. And they (the DemultiplexedConsumer instances) could have weak refs back to the Demultiplexer so as to access session info etc.

with respect to processing messages coming over layers, the Demultiplexer could use the same keys as it does for routing WS messages to rout the layer messages.

class APIDemultiplexer(WebsocketDemultiplexer):
    influencer = InfluencerDemultiplexedConsumer
    user = UserDemultiplexedConsumer

sending a message user.action.logout would call the method action_logout on the instance of UserDemultiplexedConsumer for this Demultiplexer.

What would the benefit of de/re-multiplexing itself and then manages a suite of other application instances bring?

hishnash avatar Feb 09 '18 08:02 hishnash

The whole principle of the new ASGI design is turtles all the way down - that is, a demultiplexer should take any valid ASGI application as its demultiplexing endpoints. Otherwise we would need to maintain a whole separate, parallel suite of generic consumers that are these not-quite-consumers, and you wouldn't be able to move things between them, etc.

The pattern is basically:

  • When the multiplexer is initialised it also initialises any sub-applications with the same scope
  • When its coroutine is started up it also starts up the coroutines of any sub-applications, but passing them new send/receive methods
  • When a frame comes in, it demultiplexes it and invokes the appropriate receive awaitable on a child
  • When a child invokes their version of send, it multiplexes and sends it on

andrewgodwin avatar Feb 09 '18 08:02 andrewgodwin

EDITED:

I see async def await_many_dispatch(consumer_callables, dispatch): expects an exclusive run loop hence the need for creation of new run loops for each child application.

class AsyncDemultiplexingConsumerMixin:
    """
    As a mixin so it can be added to multiplex different subclasses of Consumer
    """

    consumers = {}  # type: Dict[str: Type[AsyncConsumer]]

    def __init__(self, scope):
        super().__init__(scope)
        self.child_queues = {}
        self.child_connections = {}

    async def get_child_queue(self, lookup: str):
        """
        Sub classes of this mixin will call this to get the child responsible
        for this msg.
        """
        if lookup in self.child_queues:
            return self.child_queues[lookup]

        if lookup in self.consumers:

            self.child_queues[lookup] = self.init_child_application(
                lookup, self.consumers[lookup]
            )

            return self.child_queues[lookup]

        raise ValueError("No child application for lookup type %r" % lookup)

    async def multiplexing_send(self,
                                lookup: str,
                                message):
        """
        do the multiplexing here to the msg, eg for json ws mutliplexing
            {'stream': lookup, 'payload': message }
        """
        super().send(message)

    def init_child_application(self, lookup: str, application_class: type):

        # Make an instance of the application

        input_queue = asyncio.Queue()

        application_instance = application_class(self.scope)

        # Run it, and stash the future for later checking
        self.child_connections[lookup] = asyncio.ensure_future(
            application_instance(
                receive=input_queue.get,
                send=lambda message: self.multiplexing_send(
                    lookup=lookup,
                    message=message
                ),
            ),
            loop=asyncio.new_event_loop()
        )
        return input_queue

the issue here is these child run-loops will not be closed if the main server closes the parent run-loop.

hishnash avatar Feb 09 '18 09:02 hishnash

await_many_dispatch will be the wrong model - there are other ways of making sure child coroutines close when their parent does. The reason the routers don't need this is that they don't transform incoming and outgoing messages; it's when you do that you need to override both phases (init and coroutine) rather than just the initialisation.

New ASGI was specifically designed to allow this use case, and also to let most common overrides, like routing and auth middleware, not need to do the whole coroutine dance.

andrewgodwin avatar Feb 09 '18 18:02 andrewgodwin

EDITED:

we could use

asyncio.gather(*children)

to group the loops of the children so that they still run as part of the parent loop.

hishnash avatar Feb 12 '18 06:02 hishnash

No, there's no need to "gather" loops, everything is being awaited (including await_many_dispatch) so it will all just run. Your previous comment's conclusion, that await_many_dispatch needs its own loop, is incorrect.

I'd suggest going and reading up on how asyncio event loops work, especially with nested coroutines and futures, before you attack a problem like this ticket - I don't think having a discussion here about the intricacies of async is going to help (the pattern I outlined earlier should still work just fine).

andrewgodwin avatar Feb 12 '18 08:02 andrewgodwin

ok, I believe I have it working as you mentioned above.

However, if we just try to intercept the downstream send messages we then for jsonwebsocket, for example, will need to decode them back to json add the "stream":... and re-encode to send them up.

https://github.com/hishnash/channels/blob/Re-Implement-Multiplexing-%23825/channels/multiplexing.py (this is still in a dirty state i will clean up more before doing a PULL Request)

should we do this in the same way messages that are sent upstream are handled?

eg.

when we get a downstream (back to the client/web browser etc) message look for a local method that matches the type key.

eg the AyncJSONWebsocketDemultiplexer would have a method websocket_send that would then get fired when it gets a downstream message from one of its child applications? if the Demultiplexer does not have a method that matches the type key then it just passes it on up the tree?

hishnash avatar Feb 12 '18 09:02 hishnash

any update on this? or is it possible to migrate from channel 1.x.x to channel 2.0.x without this implemented?

stunaz avatar Mar 29 '18 23:03 stunaz

Before merge multiplexing branch to master, you can try it

install from: git clone https://github.com/django/channels/archive/multiplexing.zip

in the routing.py:

from channels.generic.websocket import WebsocketMultiplexer

EchoMultiplexer = WebsocketMultiplexer({
    "async": EchoConsumerAsync,
    "sync": EchoConsumerSync,
})

application = ProtocolTypeRouter({
    "websocket":
    AuthMiddlewareStack(
        URLRouter([
            path("", EchoMultiplexer),
        ])),
})

ruralscenery avatar Mar 30 '18 02:03 ruralscenery

@ruralscenery what is this multiplexer implementation? is it for Channels 2.0? Is there an example of EchoConsumerAsync child implementation?

maryokhin avatar May 18 '18 09:05 maryokhin

@maryokhin It's branch of Channels 2.0, If you want to run one it, you should merge the master to the branch for bugs fixing,

ruralscenery avatar May 21 '18 06:05 ruralscenery

Any ETA for this?

sevdog avatar Sep 21 '18 10:09 sevdog

This is probably not going to happen unless someone else works on it - I don't have the time any more.

andrewgodwin avatar Sep 21 '18 18:09 andrewgodwin

@sevdogI there is a freestanding package https://github.com/hishnash/channelsmultiplexer that adds a minimal version of this to channelsv2.

hishnash avatar Sep 22 '18 02:09 hishnash

@hishnash I will give a look at your code and maybe try it out.

However I think that this feature should get more support cause it is "essential" to fully upgrade an existing project from channels 1.x to 2.x, because an heavy usage of multiplexing (multi purpose connection) is not easy to replace with a set of single purpose socket connections.

sevdog avatar Sep 24 '18 15:09 sevdog

It may be essential, but it still has to be written by someone - at this point, I'm starting to focus my efforts more on Django async so unless we can find a new Channels maintainer or someone to write this, it'll remain incomplete.

andrewgodwin avatar Sep 24 '18 16:09 andrewgodwin

Any progress? I would really appreciate multiplexing support in channels 2. I don't want to have separate TCP connection for each realtime component in my web app.

the-papi avatar Dec 20 '19 10:12 the-papi

I did some work to create a multiplex channels consumer: https://gist.github.com/ronnievdc/dd65a1ba1fef86ba73e37faeaaf06de2

Assume you have 2 websockets on one page and want to expose both consumers / channels over one websocket connection

const channel_ws = new WebSocket('ws:///url/to/consumer');
const other_channel_ws = new WebSocket('ws://url/to/other/consumer');

To combine both consumers (and all other consumers you already have defined in routing.py) Add the following line to your routing.py

re_path(r'ws/multiplex/$', AsyncMultiplexWebsocketConsumer),

And change your javascript so it uses both consumers over 1 websocket connection, without changing the consumers at all.

const ws = new WebSocketMultiplex(new WebSocket('ws://ws/multiplex/'));
const channel_ws = ws.channel('/url/to/consumer');
const other_channel_ws = ws.channel('/url/to/other/consumer');

ronnievdc avatar May 13 '20 14:05 ronnievdc

Do any of the recent async enhancements in Django 3.1 or the work towards ASGI v3 (#1480) make official multiplexing support easier to realize?

johnthagen avatar Aug 08 '20 13:08 johnthagen

Multiplexing was never about the underlying protocol or support - the sticking point for me has always been that it needs a lot of client-side code and a good specification that can handle connection drops to make it work well. It can also be totally accomplished as a third-party app, which was always my suggestion if someone wanted to work on it and prove it out before making a pitch to have it rolled in.

andrewgodwin avatar Aug 08 '20 22:08 andrewgodwin

I wrote a third party app that provides a (de)multiplexer for Channels 2: https://github.com/csdenboer/channels-demultiplexer. Is it OK if I create a PR that adds a link to this package to the documentation?

csdenboer avatar Sep 19 '20 08:09 csdenboer

I have created a PR (https://github.com/django/channels/pull/1503) that adds documentation about the current multiplexing support of Channels 2 and a package I have published that provides a (de)multiplexer for AsyncJsonWebsocketConsumer consumers. I would like to give the package some exposure so I can prove it out before making a pitch to have it rolled in.

csdenboer avatar Sep 25 '20 08:09 csdenboer