(Re)Implement Multiplexing
Channels 2 needs multiplexing support, as it's not yet implemented. It'll need a substantially different design to the Channels 1 one, but it should be easier to make it nest nicely.
I am trying to visualize how this should be implemented.
The problem I keep encountering, however, is that the Demultiplexer needs to be the class that terminates the WebSocket connection. As we don't know where to dispatch messages to until we receive them, and every message for a given connection could have a different destination.
So having the Demultiplexer dispatch to a set of Consumer classes seems wrong, because the Consumer classes each assume that they exclusively terminate the websocket connection. They also contain functionality that is not needed, such as reading/writing from the websocket.
So maybe we need a completely independent consumer class (probably sync and async) dedicated to just receiving Demultiplexer JSON messages?
Yes, this is going to need an ASGI application (probably a consumer but might have to be lower-level) that does the de/re-multiplexing itself and then manages a suite of other application instances, one for each stream. It's going to be a lot more complex than the middlewares/routing that just looks at scope.
You think it needs to be this complex?
Why not do as @brianmay suggested and terminate the WebSocket on the Demultiplexer and make the child Consumerish classes not be true Consumers but rather be some DemultiplexedConsumer the Demultiplexer could keep references to them. And they (the DemultiplexedConsumer instances) could have weak refs back to the Demultiplexer so as to access session info etc.
with respect to processing messages coming over layers, the Demultiplexer could use the same keys as it does for routing WS messages to rout the layer messages.
class APIDemultiplexer(WebsocketDemultiplexer):
influencer = InfluencerDemultiplexedConsumer
user = UserDemultiplexedConsumer
sending a message user.action.logout would call the method action_logout on
the instance of UserDemultiplexedConsumer for this Demultiplexer.
What would the benefit of de/re-multiplexing itself and then manages a suite of other application instances bring?
The whole principle of the new ASGI design is turtles all the way down - that is, a demultiplexer should take any valid ASGI application as its demultiplexing endpoints. Otherwise we would need to maintain a whole separate, parallel suite of generic consumers that are these not-quite-consumers, and you wouldn't be able to move things between them, etc.
The pattern is basically:
- When the multiplexer is initialised it also initialises any sub-applications with the same scope
- When its coroutine is started up it also starts up the coroutines of any sub-applications, but passing them new send/receive methods
- When a frame comes in, it demultiplexes it and invokes the appropriate receive awaitable on a child
- When a child invokes their version of send, it multiplexes and sends it on
EDITED:
I see async def await_many_dispatch(consumer_callables, dispatch): expects an exclusive run loop hence the need for creation of new run loops for each child application.
class AsyncDemultiplexingConsumerMixin:
"""
As a mixin so it can be added to multiplex different subclasses of Consumer
"""
consumers = {} # type: Dict[str: Type[AsyncConsumer]]
def __init__(self, scope):
super().__init__(scope)
self.child_queues = {}
self.child_connections = {}
async def get_child_queue(self, lookup: str):
"""
Sub classes of this mixin will call this to get the child responsible
for this msg.
"""
if lookup in self.child_queues:
return self.child_queues[lookup]
if lookup in self.consumers:
self.child_queues[lookup] = self.init_child_application(
lookup, self.consumers[lookup]
)
return self.child_queues[lookup]
raise ValueError("No child application for lookup type %r" % lookup)
async def multiplexing_send(self,
lookup: str,
message):
"""
do the multiplexing here to the msg, eg for json ws mutliplexing
{'stream': lookup, 'payload': message }
"""
super().send(message)
def init_child_application(self, lookup: str, application_class: type):
# Make an instance of the application
input_queue = asyncio.Queue()
application_instance = application_class(self.scope)
# Run it, and stash the future for later checking
self.child_connections[lookup] = asyncio.ensure_future(
application_instance(
receive=input_queue.get,
send=lambda message: self.multiplexing_send(
lookup=lookup,
message=message
),
),
loop=asyncio.new_event_loop()
)
return input_queue
the issue here is these child run-loops will not be closed if the main server closes the parent run-loop.
await_many_dispatch will be the wrong model - there are other ways of making sure child coroutines close when their parent does. The reason the routers don't need this is that they don't transform incoming and outgoing messages; it's when you do that you need to override both phases (init and coroutine) rather than just the initialisation.
New ASGI was specifically designed to allow this use case, and also to let most common overrides, like routing and auth middleware, not need to do the whole coroutine dance.
EDITED:
we could use
asyncio.gather(*children)
to group the loops of the children so that they still run as part of the parent loop.
No, there's no need to "gather" loops, everything is being awaited (including await_many_dispatch) so it will all just run. Your previous comment's conclusion, that await_many_dispatch needs its own loop, is incorrect.
I'd suggest going and reading up on how asyncio event loops work, especially with nested coroutines and futures, before you attack a problem like this ticket - I don't think having a discussion here about the intricacies of async is going to help (the pattern I outlined earlier should still work just fine).
ok, I believe I have it working as you mentioned above.
However, if we just try to intercept the downstream send messages we then for jsonwebsocket, for example, will need to decode them back to json add the "stream":... and re-encode to send them up.
https://github.com/hishnash/channels/blob/Re-Implement-Multiplexing-%23825/channels/multiplexing.py (this is still in a dirty state i will clean up more before doing a PULL Request)
should we do this in the same way messages that are sent upstream are handled?
eg.
when we get a downstream (back to the client/web browser etc) message look for a local method that matches the type key.
eg the AyncJSONWebsocketDemultiplexer would have a method websocket_send that would then get fired when it gets a downstream message from one of its child applications? if the Demultiplexer does not have a method that matches the type key then it just passes it on up the tree?
any update on this? or is it possible to migrate from channel 1.x.x to channel 2.0.x without this implemented?
Before merge multiplexing branch to master, you can try it
install from: git clone https://github.com/django/channels/archive/multiplexing.zip
in the routing.py:
from channels.generic.websocket import WebsocketMultiplexer
EchoMultiplexer = WebsocketMultiplexer({
"async": EchoConsumerAsync,
"sync": EchoConsumerSync,
})
application = ProtocolTypeRouter({
"websocket":
AuthMiddlewareStack(
URLRouter([
path("", EchoMultiplexer),
])),
})
@ruralscenery what is this multiplexer implementation? is it for Channels 2.0? Is there an example of EchoConsumerAsync child implementation?
@maryokhin It's branch of Channels 2.0, If you want to run one it, you should merge the master to the branch for bugs fixing,
Any ETA for this?
This is probably not going to happen unless someone else works on it - I don't have the time any more.
@sevdogI there is a freestanding package https://github.com/hishnash/channelsmultiplexer that adds a minimal version of this to channelsv2.
@hishnash I will give a look at your code and maybe try it out.
However I think that this feature should get more support cause it is "essential" to fully upgrade an existing project from channels 1.x to 2.x, because an heavy usage of multiplexing (multi purpose connection) is not easy to replace with a set of single purpose socket connections.
It may be essential, but it still has to be written by someone - at this point, I'm starting to focus my efforts more on Django async so unless we can find a new Channels maintainer or someone to write this, it'll remain incomplete.
Any progress? I would really appreciate multiplexing support in channels 2. I don't want to have separate TCP connection for each realtime component in my web app.
I did some work to create a multiplex channels consumer: https://gist.github.com/ronnievdc/dd65a1ba1fef86ba73e37faeaaf06de2
Assume you have 2 websockets on one page and want to expose both consumers / channels over one websocket connection
const channel_ws = new WebSocket('ws:///url/to/consumer');
const other_channel_ws = new WebSocket('ws://url/to/other/consumer');
To combine both consumers (and all other consumers you already have defined in routing.py) Add the following line to your routing.py
re_path(r'ws/multiplex/$', AsyncMultiplexWebsocketConsumer),
And change your javascript so it uses both consumers over 1 websocket connection, without changing the consumers at all.
const ws = new WebSocketMultiplex(new WebSocket('ws://ws/multiplex/'));
const channel_ws = ws.channel('/url/to/consumer');
const other_channel_ws = ws.channel('/url/to/other/consumer');
Do any of the recent async enhancements in Django 3.1 or the work towards ASGI v3 (#1480) make official multiplexing support easier to realize?
Multiplexing was never about the underlying protocol or support - the sticking point for me has always been that it needs a lot of client-side code and a good specification that can handle connection drops to make it work well. It can also be totally accomplished as a third-party app, which was always my suggestion if someone wanted to work on it and prove it out before making a pitch to have it rolled in.
I wrote a third party app that provides a (de)multiplexer for Channels 2: https://github.com/csdenboer/channels-demultiplexer. Is it OK if I create a PR that adds a link to this package to the documentation?
I have created a PR (https://github.com/django/channels/pull/1503) that adds documentation about the current multiplexing support of Channels 2 and a package I have published that provides a (de)multiplexer for AsyncJsonWebsocketConsumer consumers. I would like to give the package some exposure so I can prove it out before making a pitch to have it rolled in.