rxjs
rxjs copied to clipboard
Using rxjs/webSocket on a server
Feature Request
Right now WebSocketSubject only accepts a constructor. Can this be changed to also accept an existing socket in the config?
Specifically, my use case is that I'd like to use rxjs with websocket/ws on my server, and their API would allow me to do something like:
wss.on("connection", (ws: WebSocket) => {
const rxjsWebSocket = webSocket(ws); // Wrap the node WebSocket into a subject.
});
wrt api compatibility, I think it's reasonable to assume the web socket remains w3c compatible instead of worrying about the NodeJS.WebSocket API. There are various compatibility shims to adapt the assorted NodeJS variants to w3c sockets. I'm curious if this was an intentional choice to not allow wrapping an existing socket or if this can be done?
I'm working on making a transparent WebSocket layer for connecting a browser to a remote server (Raspberry Pi) to be able to subscribe directly to sensors and set outputs etc. from the browser or front-end application. So the idea is to have a WebSocket server that handles incoming connections and subscribes in a seamless manner to the GPIO ports.
This would be a welcomed feature!
@kevmo314 Seems there is a way to do this already:
const WebSocket = require('ws')
const { webSocket } = require('rxjs/webSocket')
const webSocketConnection$ = (
webSocket({
protocol: 'v1',
url: 'http://example.com:3000',
WebSocketCtor: WebSocket,
})
)
Ref: https://gist.github.com/Sawtaytoes/fe3d16b1a15aa20eef5d2a41d0b39934 See: https://rxjs.dev/api/webSocket/WebSocketSubjectConfig#WebSocketCtor
@abarke This doesn't work for existing web sockets. I mentioned this in the first line of the FR.
@kevmo314 apologies I see your point. You are right. An already established WebSocket would allow a new WebSocketSubject
to be created for each incoming WS connection. The WebSocketSubject
would then be used to send and receive messages on the server-side. I suppose one could then subscribe each WebSocketSubject
to a shared resource ('hardware input, data stream, logs, etc.') e.g. ResourceSubject
that then emits updated values and multicasting it to all subscribers (WebSocket Clients).
I created a demo to simulate a shared resource on a server just to wrap my head around it, as I need something similar for a project: https://stackblitz.com/edit/rxjs-shared-source-example?file=index.ts
It might be simple enough to modify the WebSocketSubject
to take a WebSocket as an argument. I might just give it a hack 🙂
Hey @kevmo314 I managed to create a PoC and published the library on NPM for consumption. I rewrote the WebSocketSubject class as extending it proved difficult due to not being able to set private members of the WebSocketSubject
. Works great in my project so far. Any feedback/contributions welcome.
https://www.npmjs.com/package/rxjs-websocket-server-subject
Any feedback on this @kevmo314 ?
Im looking at opening a PR for this, however I'm not sure if I should merge the WebSocketServerSubject into the WebSocketSubject (could get messy) or leave it as its own class (some duplicate code, but has separation of concerns and is more maintainable IMHO)
@benlesh any ideas or feedback of how to proceed?
Unfortunately my use case has come and gone, the code that needed this is now happily running and stable. I took a look at the package though, it looks pretty good and I'd love to see this PR'd into the main rxjs/webSocket
package. One edge case I'm curious about, what happens if there are two subjects on a single socket and one closes due to backpressure? What happens to the other subject?
@kevmo314 regarding two subjects on a single socket... do you mean something like this?
wsServer.on('connection', (webSocket: WebSocket, req: IncomingMessage) => {
const wsServerSubject = new WebSocketServerSubject<Message>(webSocket)
const wsServerSubject2 = new WebSocketServerSubject<Message>(webSocket)
I tried this but it seems only wsServerSubject2
is bound. Not sure what your specific use case is, however in my test project I just create a single wsServerSubject
and pipe various listeners to that subject that filter a specific message for each message type emitted. Could you elaborate on your use case as to why having two subjects on a single socket would be useful?
what about using fromEvent
? From then on its possible to do everything right?
const wss = new Server({ server });
const handleConnection = (websocket: WebSocket): Observable<WebSocket.MessageEvent | WebSocket.CloseEvent> => {
const onmessage$ = fromEvent<WebSocket.MessageEvent>(websocket, 'message')
.pipe(
tap(m => console.log(m.type))
);
const onclose$ = fromEvent<WebSocket.CloseEvent>(websocket, 'close')
.pipe(
tap(m => console.log(m.reason)));
// const onerror$ = ....
// etc
return merge(onmessage$, onclose$)
}
fromEvent(wss, 'connection')
.pipe(
map(a => a[0] as WebSocket),
mergeMap(handleConnection)
).subscribe();
}
what about using
fromEvent
? From then on its possible to do everything right?const wss = new Server({ server }); const handleConnection = (websocket: WebSocket): Observable<WebSocket.MessageEvent | WebSocket.CloseEvent> => { const onmessage$ = fromEvent<WebSocket.MessageEvent>(websocket, 'message') .pipe( tap(m => console.log(m.type)) ); const onclose$ = fromEvent<WebSocket.CloseEvent>(websocket, 'close') .pipe( tap(m => console.log(m.reason))); // const onerror$ = .... // etc return merge(onmessage$, onclose$) } fromEvent(wss, 'connection') .pipe( map(a => a[0] as WebSocket), mergeMap(handleConnection) ).subscribe(); }
This is very nice idea, did you try it ? once you go rxjs you need it to the vain
what about using
fromEvent
? From then on its possible to do everything right?const wss = new Server({ server }); const handleConnection = (websocket: WebSocket): Observable<WebSocket.MessageEvent | WebSocket.CloseEvent> => { const onmessage$ = fromEvent<WebSocket.MessageEvent>(websocket, 'message') .pipe( tap(m => console.log(m.type)) ); const onclose$ = fromEvent<WebSocket.CloseEvent>(websocket, 'close') .pipe( tap(m => console.log(m.reason))); // const onerror$ = .... // etc return merge(onmessage$, onclose$) } fromEvent(wss, 'connection') .pipe( map(a => a[0] as WebSocket), mergeMap(handleConnection) ).subscribe(); }
This is very nice idea, did you try it ? once you go rxjs you need it to the vain
sure. in the moment one message type is looking like that:
const onConvertMessage$ = fromEvent(websocket, 'message')
.pipe(
map(m => WsExchange.fromMessage(JSON.parse((m as WsMessage).data))),
filter(m => m.action === "convert"),
map(m => m as ConvertText),
switchMap(handleConvertion),
mergeMap(uploadToAws),
map(bucket => GeneratedAudio.from(bucket)),
tap(message => websocket.send(JSON.stringify(message))),
catchError((err, caught) => {
console.log("error in convert pipeline: ", err)
return caught;
}),
);
I don't think this makes sense as part of rxjs
because the change is to support a userland socket implementation that only works in a node environment. It would be a large commitment for RxJS development to follow ws development. For example, what versions should RxJS pin as a peer dependency? Who will test changes? ws
is my preferred tooling, and this package looks like a great implementation, so why not continue to use this as a package?
If it is included, it should not be part of the dom
observable module because the browser runtime is not supported.
It would be a large commitment for RxJS development to follow ws development.
This is not true, as mentioned in the original issue the request is to follow the w3c specification for which the ws websocket is compliant. The change is to support passing in a spec-compliant websocket which also exists in browsers, not a specifically nodejs runtime websocket.
The client is spec-compliant in that it can talk to other spec-compliant sockets, absolutely, but the API cannot be used in the browser. For example, the node websocket allows the user to implement ping/pong, whereas browser websockets handle ping/pong automatically. Does RxJS need to expose additional streams on the subject? IMO yes, if the goal is to fully support native sockets.
If the goal is only to pass in a web socket, why not create a dynamic constructor as-needed? Someone suggested this earlier, but without creating a constructor on-the-fly. We could leverage variable scope, for example:
const nativeSocket = /* native `ws` created socket reference */;
const WebsocketCtor = function LocalCtor(url: string, protocol: string[]) {
this = nativeSocket;
};
const rxjsSocket = new WebSocketSubject({
WebSocketCtor,
});
In this situation, RxJS won't attach listeners until the developer subscribes to the new subject. It would be possible for the socket to emit events that are lost and de-sync RxJS state. For example, the openObserver
might not get called. I need to review your changes better to understand how retry logic works. For example: By passing in a socket instead of a config, how would RxJS know the correct origin or protocol for the new socket?
I'm writing an additional comment instead of more edits 😂 @kevmo314 is the intention that your package could be merged right over the WebSocketSubject code? How can we review this as a PR? I'd love to get a proper diff so I can better understand your idea.
what about using
fromEvent
? From then on its possible to do everything right?const wss = new Server({ server }); const handleConnection = (websocket: WebSocket): Observable<WebSocket.MessageEvent | WebSocket.CloseEvent> => { const onmessage$ = fromEvent<WebSocket.MessageEvent>(websocket, 'message') .pipe( tap(m => console.log(m.type)) ); const onclose$ = fromEvent<WebSocket.CloseEvent>(websocket, 'close') .pipe( tap(m => console.log(m.reason))); // const onerror$ = .... // etc return merge(onmessage$, onclose$) } fromEvent(wss, 'connection') .pipe( map(a => a[0] as WebSocket), mergeMap(handleConnection) ).subscribe(); }
Hi, I'm trying to access message's data in handleConnection
's observables:
import express from 'express'
import { createServer } from 'http'
import { fromEvent, merge } from 'rxjs'
import { map, mergeMap, tap } from 'rxjs/operators'
import { WebSocketServer } from 'ws'
const app = express()
const server = createServer(app)
const wss = new WebSocketServer({ server })
const handleConnection = socket => {
const onClose$ = fromEvent(socket, 'close').pipe(tap(value => console.log('close', value)))
const onMessage$ = fromEvent(socket, 'message').pipe(tap(value => console.log('message', value)))
return merge(onClose$, onMessage$)
}
fromEvent(wss, 'connection')
.pipe(
map(value => value[0]),
mergeMap(handleConnection)
)
.subscribe()
...but instead I'm getting ws's event target formatted messages, like so:
close CloseEvent {
[Symbol(kTarget)]: <ref *1> WebSocket {
_events: [Object: null prototype] {
close: [Array],
error: [Function],
message: [Function],
open: [Function]
},
_eventsCount: 4,
_maxListeners: undefined,
_binaryType: 'nodebuffer',
_closeCode: 1005,
_closeFrameReceived: true,
_closeFrameSent: true,
_closeMessage: <Buffer >,
_closeTimer: Timeout {
_idleTimeout: -1,
_idlePrev: null,
_idleNext: null,
_idleStart: 2493,
_onTimeout: null,
_timerArgs: undefined,
_repeat: null,
_destroyed: true,
[Symbol(refed)]: true,
[Symbol(kHasPrimitive)]: false,
[Symbol(asyncId)]: 43,
[Symbol(triggerId)]: 35
},
_errorEmitted: false,
_extensions: {},
_paused: false,
_protocol: '',
_readyState: 3,
_receiver: Receiver {
_events: [Object: null prototype] {},
_writableState: [WritableState],
_maxListeners: undefined,
_allowSynchronousEvents: true,
_binaryType: 'nodebuffer',
_extensions: {},
_isServer: true,
_maxPayload: 104857600,
_skipUTF8Validation: false,
_bufferedBytes: 0,
_buffers: [],
_compressed: false,
_payloadLength: 0,
_mask: <Buffer b0 dc 14 ca>,
_fragmented: 0,
_masked: true,
_fin: true,
_opcode: 8,
_totalPayloadLength: 0,
_messageLength: 0,
_fragments: [],
_errored: false,
_loop: false,
_state: 0,
_eventsCount: 0,
[Symbol(shapeMode)]: false,
[Symbol(kCapture)]: false,
[Symbol(websocket)]: [Circular *1]
},
_sender: Sender {
_extensions: {},
_socket: [Socket],
_firstFragment: true,
_compress: false,
_bufferedBytes: 0,
_queue: [],
_state: 0,
onerror: [Function: senderOnError],
[Symbol(websocket)]: [Circular *1]
},
_socket: Socket {
connecting: false,
_hadError: false,
_parent: null,
_host: null,
_closeAfterHandlingError: false,
_events: [Object],
_readableState: [ReadableState],
_writableState: [WritableState],
allowHalfOpen: true,
_maxListeners: undefined,
_eventsCount: 2,
_sockname: null,
_pendingData: null,
_pendingEncoding: '',
server: [Server],
_server: [Server],
parser: null,
on: [Function (anonymous)],
addListener: [Function (anonymous)],
prependListener: [Function: prependListener],
setEncoding: [Function: socketSetEncoding],
_paused: false,
timeout: 0,
[Symbol(async_id_symbol)]: 35,
[Symbol(kHandle)]: null,
[Symbol(lastWriteQueueSize)]: 0,
[Symbol(timeout)]: null,
[Symbol(kBuffer)]: null,
[Symbol(kBufferCb)]: null,
[Symbol(kBufferGen)]: null,
[Symbol(shapeMode)]: true,
[Symbol(kCapture)]: false,
[Symbol(kSetNoDelay)]: true,
[Symbol(kSetKeepAlive)]: false,
[Symbol(kSetKeepAliveInitialDelay)]: 0,
[Symbol(kBytesRead)]: 229,
[Symbol(kBytesWritten)]: 131,
[Symbol(websocket)]: undefined
},
_autoPong: true,
_isServer: true,
[Symbol(shapeMode)]: false,
[Symbol(kCapture)]: false
},
[Symbol(kType)]: 'close',
[Symbol(kCode)]: 1005,
[Symbol(kReason)]: '',
[Symbol(kWasClean)]: true
}
How can I fix this? Thanks.-