rxjs icon indicating copy to clipboard operation
rxjs copied to clipboard

Using rxjs/webSocket on a server

Open kevmo314 opened this issue 4 years ago • 16 comments

Feature Request

Right now WebSocketSubject only accepts a constructor. Can this be changed to also accept an existing socket in the config?

Specifically, my use case is that I'd like to use rxjs with websocket/ws on my server, and their API would allow me to do something like:

wss.on("connection", (ws: WebSocket) => {
  const rxjsWebSocket = webSocket(ws); // Wrap the node WebSocket into a subject.
});

wrt api compatibility, I think it's reasonable to assume the web socket remains w3c compatible instead of worrying about the NodeJS.WebSocket API. There are various compatibility shims to adapt the assorted NodeJS variants to w3c sockets. I'm curious if this was an intentional choice to not allow wrapping an existing socket or if this can be done?

kevmo314 avatar Apr 10 '20 19:04 kevmo314

I'm working on making a transparent WebSocket layer for connecting a browser to a remote server (Raspberry Pi) to be able to subscribe directly to sensors and set outputs etc. from the browser or front-end application. So the idea is to have a WebSocket server that handles incoming connections and subscribes in a seamless manner to the GPIO ports.

This would be a welcomed feature!

six-edge avatar Jan 16 '21 10:01 six-edge

@kevmo314 Seems there is a way to do this already:

const WebSocket = require('ws')
const { webSocket } = require('rxjs/webSocket')

const webSocketConnection$ = (
  webSocket({
    protocol: 'v1',
    url: 'http://example.com:3000',
    WebSocketCtor: WebSocket,
  })
)

Ref: https://gist.github.com/Sawtaytoes/fe3d16b1a15aa20eef5d2a41d0b39934 See: https://rxjs.dev/api/webSocket/WebSocketSubjectConfig#WebSocketCtor

abarke avatar Jan 16 '21 23:01 abarke

@abarke This doesn't work for existing web sockets. I mentioned this in the first line of the FR.

kevmo314 avatar Jan 17 '21 00:01 kevmo314

@kevmo314 apologies I see your point. You are right. An already established WebSocket would allow a new WebSocketSubject to be created for each incoming WS connection. The WebSocketSubject would then be used to send and receive messages on the server-side. I suppose one could then subscribe each WebSocketSubject to a shared resource ('hardware input, data stream, logs, etc.') e.g. ResourceSubject that then emits updated values and multicasting it to all subscribers (WebSocket Clients).

I created a demo to simulate a shared resource on a server just to wrap my head around it, as I need something similar for a project: https://stackblitz.com/edit/rxjs-shared-source-example?file=index.ts

It might be simple enough to modify the WebSocketSubject to take a WebSocket as an argument. I might just give it a hack 🙂

abarke avatar Jan 19 '21 02:01 abarke

Hey @kevmo314 I managed to create a PoC and published the library on NPM for consumption. I rewrote the WebSocketSubject class as extending it proved difficult due to not being able to set private members of the WebSocketSubject. Works great in my project so far. Any feedback/contributions welcome.

https://www.npmjs.com/package/rxjs-websocket-server-subject

six-edge avatar Jan 29 '21 22:01 six-edge

Any feedback on this @kevmo314 ?

Im looking at opening a PR for this, however I'm not sure if I should merge the WebSocketServerSubject into the WebSocketSubject (could get messy) or leave it as its own class (some duplicate code, but has separation of concerns and is more maintainable IMHO)

@benlesh any ideas or feedback of how to proceed?

six-edge avatar Mar 08 '21 15:03 six-edge

Unfortunately my use case has come and gone, the code that needed this is now happily running and stable. I took a look at the package though, it looks pretty good and I'd love to see this PR'd into the main rxjs/webSocket package. One edge case I'm curious about, what happens if there are two subjects on a single socket and one closes due to backpressure? What happens to the other subject?

kevmo314 avatar Mar 08 '21 16:03 kevmo314

@kevmo314 regarding two subjects on a single socket... do you mean something like this?

wsServer.on('connection', (webSocket: WebSocket, req: IncomingMessage) => {
  const wsServerSubject = new WebSocketServerSubject<Message>(webSocket)
  const wsServerSubject2 = new WebSocketServerSubject<Message>(webSocket)

I tried this but it seems only wsServerSubject2 is bound. Not sure what your specific use case is, however in my test project I just create a single wsServerSubject and pipe various listeners to that subject that filter a specific message for each message type emitted. Could you elaborate on your use case as to why having two subjects on a single socket would be useful?

six-edge avatar Apr 06 '21 17:04 six-edge

what about using fromEvent? From then on its possible to do everything right?

const wss = new Server({ server });

const handleConnection = (websocket: WebSocket): Observable<WebSocket.MessageEvent | WebSocket.CloseEvent> => {
  const onmessage$ = fromEvent<WebSocket.MessageEvent>(websocket, 'message')
    .pipe(
      tap(m => console.log(m.type))
    );
  const onclose$ = fromEvent<WebSocket.CloseEvent>(websocket, 'close')
    .pipe(
      tap(m => console.log(m.reason)));
      // const onerror$ = ....
      // etc
  return merge(onmessage$, onclose$)
}

fromEvent(wss, 'connection')
  .pipe(
    map(a => a[0] as WebSocket),
    mergeMap(handleConnection)
  ).subscribe();
}

ValentinBossi avatar May 01 '21 16:05 ValentinBossi

what about using fromEvent? From then on its possible to do everything right?

const wss = new Server({ server });

const handleConnection = (websocket: WebSocket): Observable<WebSocket.MessageEvent | WebSocket.CloseEvent> => {
  const onmessage$ = fromEvent<WebSocket.MessageEvent>(websocket, 'message')
    .pipe(
      tap(m => console.log(m.type))
    );
  const onclose$ = fromEvent<WebSocket.CloseEvent>(websocket, 'close')
    .pipe(
      tap(m => console.log(m.reason)));
      // const onerror$ = ....
      // etc
  return merge(onmessage$, onclose$)
}

fromEvent(wss, 'connection')
  .pipe(
    map(a => a[0] as WebSocket),
    mergeMap(handleConnection)
  ).subscribe();
}

This is very nice idea, did you try it ? once you go rxjs you need it to the vain

noririco avatar May 19 '21 11:05 noririco

what about using fromEvent? From then on its possible to do everything right?

const wss = new Server({ server });

const handleConnection = (websocket: WebSocket): Observable<WebSocket.MessageEvent | WebSocket.CloseEvent> => {
  const onmessage$ = fromEvent<WebSocket.MessageEvent>(websocket, 'message')
    .pipe(
      tap(m => console.log(m.type))
    );
  const onclose$ = fromEvent<WebSocket.CloseEvent>(websocket, 'close')
    .pipe(
      tap(m => console.log(m.reason)));
      // const onerror$ = ....
      // etc
  return merge(onmessage$, onclose$)
}

fromEvent(wss, 'connection')
  .pipe(
    map(a => a[0] as WebSocket),
    mergeMap(handleConnection)
  ).subscribe();
}

This is very nice idea, did you try it ? once you go rxjs you need it to the vain

sure. in the moment one message type is looking like that:

const onConvertMessage$ = fromEvent(websocket, 'message')
        .pipe(
            map(m => WsExchange.fromMessage(JSON.parse((m as WsMessage).data))),
            filter(m => m.action === "convert"),
            map(m => m as ConvertText),
            switchMap(handleConvertion),
            mergeMap(uploadToAws),
            map(bucket => GeneratedAudio.from(bucket)),
            tap(message => websocket.send(JSON.stringify(message))),
            catchError((err, caught) => {
                console.log("error in convert pipeline: ", err)
                return caught;
            }),
        );

ValentinBossi avatar May 22 '21 06:05 ValentinBossi

I don't think this makes sense as part of rxjs because the change is to support a userland socket implementation that only works in a node environment. It would be a large commitment for RxJS development to follow ws development. For example, what versions should RxJS pin as a peer dependency? Who will test changes? ws is my preferred tooling, and this package looks like a great implementation, so why not continue to use this as a package?

If it is included, it should not be part of the dom observable module because the browser runtime is not supported.

bever1337 avatar Sep 30 '22 19:09 bever1337

It would be a large commitment for RxJS development to follow ws development.

This is not true, as mentioned in the original issue the request is to follow the w3c specification for which the ws websocket is compliant. The change is to support passing in a spec-compliant websocket which also exists in browsers, not a specifically nodejs runtime websocket.

kevmo314 avatar Sep 30 '22 19:09 kevmo314

The client is spec-compliant in that it can talk to other spec-compliant sockets, absolutely, but the API cannot be used in the browser. For example, the node websocket allows the user to implement ping/pong, whereas browser websockets handle ping/pong automatically. Does RxJS need to expose additional streams on the subject? IMO yes, if the goal is to fully support native sockets.

If the goal is only to pass in a web socket, why not create a dynamic constructor as-needed? Someone suggested this earlier, but without creating a constructor on-the-fly. We could leverage variable scope, for example:

const nativeSocket = /* native `ws` created socket reference */;
const WebsocketCtor = function LocalCtor(url: string, protocol: string[]) {
  this = nativeSocket;
};
const rxjsSocket = new WebSocketSubject({
  WebSocketCtor,
});

In this situation, RxJS won't attach listeners until the developer subscribes to the new subject. It would be possible for the socket to emit events that are lost and de-sync RxJS state. For example, the openObserver might not get called. I need to review your changes better to understand how retry logic works. For example: By passing in a socket instead of a config, how would RxJS know the correct origin or protocol for the new socket?

bever1337 avatar Oct 01 '22 15:10 bever1337

I'm writing an additional comment instead of more edits 😂 @kevmo314 is the intention that your package could be merged right over the WebSocketSubject code? How can we review this as a PR? I'd love to get a proper diff so I can better understand your idea.

bever1337 avatar Oct 01 '22 15:10 bever1337

what about using fromEvent? From then on its possible to do everything right?

const wss = new Server({ server });

const handleConnection = (websocket: WebSocket): Observable<WebSocket.MessageEvent | WebSocket.CloseEvent> => {
  const onmessage$ = fromEvent<WebSocket.MessageEvent>(websocket, 'message')
    .pipe(
      tap(m => console.log(m.type))
    );
  const onclose$ = fromEvent<WebSocket.CloseEvent>(websocket, 'close')
    .pipe(
      tap(m => console.log(m.reason)));
      // const onerror$ = ....
      // etc
  return merge(onmessage$, onclose$)
}

fromEvent(wss, 'connection')
  .pipe(
    map(a => a[0] as WebSocket),
    mergeMap(handleConnection)
  ).subscribe();
}

Hi, I'm trying to access message's data in handleConnection's observables:

import express from 'express'
import { createServer } from 'http'
import { fromEvent, merge } from 'rxjs'
import { map, mergeMap, tap } from 'rxjs/operators'
import { WebSocketServer } from 'ws'

const app = express()
const server = createServer(app)
const wss = new WebSocketServer({ server })

const handleConnection = socket => {
  const onClose$ = fromEvent(socket, 'close').pipe(tap(value => console.log('close', value)))
  const onMessage$ = fromEvent(socket, 'message').pipe(tap(value => console.log('message', value)))
  return merge(onClose$, onMessage$)
}

fromEvent(wss, 'connection')
  .pipe(
    map(value => value[0]),
    mergeMap(handleConnection)
  )
  .subscribe()

...but instead I'm getting ws's event target formatted messages, like so:

close CloseEvent {
  [Symbol(kTarget)]: <ref *1> WebSocket {
    _events: [Object: null prototype] {
      close: [Array],
      error: [Function],
      message: [Function],
      open: [Function]
    },
    _eventsCount: 4,
    _maxListeners: undefined,
    _binaryType: 'nodebuffer',
    _closeCode: 1005,
    _closeFrameReceived: true,
    _closeFrameSent: true,
    _closeMessage: <Buffer >,
    _closeTimer: Timeout {
      _idleTimeout: -1,
      _idlePrev: null,
      _idleNext: null,
      _idleStart: 2493,
      _onTimeout: null,
      _timerArgs: undefined,
      _repeat: null,
      _destroyed: true,
      [Symbol(refed)]: true,
      [Symbol(kHasPrimitive)]: false,
      [Symbol(asyncId)]: 43,
      [Symbol(triggerId)]: 35
    },
    _errorEmitted: false,
    _extensions: {},
    _paused: false,
    _protocol: '',
    _readyState: 3,
    _receiver: Receiver {
      _events: [Object: null prototype] {},
      _writableState: [WritableState],
      _maxListeners: undefined,
      _allowSynchronousEvents: true,
      _binaryType: 'nodebuffer',
      _extensions: {},
      _isServer: true,
      _maxPayload: 104857600,
      _skipUTF8Validation: false,
      _bufferedBytes: 0,
      _buffers: [],
      _compressed: false,
      _payloadLength: 0,
      _mask: <Buffer b0 dc 14 ca>,
      _fragmented: 0,
      _masked: true,
      _fin: true,
      _opcode: 8,
      _totalPayloadLength: 0,
      _messageLength: 0,
      _fragments: [],
      _errored: false,
      _loop: false,
      _state: 0,
      _eventsCount: 0,
      [Symbol(shapeMode)]: false,
      [Symbol(kCapture)]: false,
      [Symbol(websocket)]: [Circular *1]
    },
    _sender: Sender {
      _extensions: {},
      _socket: [Socket],
      _firstFragment: true,
      _compress: false,
      _bufferedBytes: 0,
      _queue: [],
      _state: 0,
      onerror: [Function: senderOnError],
      [Symbol(websocket)]: [Circular *1]
    },
    _socket: Socket {
      connecting: false,
      _hadError: false,
      _parent: null,
      _host: null,
      _closeAfterHandlingError: false,
      _events: [Object],
      _readableState: [ReadableState],
      _writableState: [WritableState],
      allowHalfOpen: true,
      _maxListeners: undefined,
      _eventsCount: 2,
      _sockname: null,
      _pendingData: null,
      _pendingEncoding: '',
      server: [Server],
      _server: [Server],
      parser: null,
      on: [Function (anonymous)],
      addListener: [Function (anonymous)],
      prependListener: [Function: prependListener],
      setEncoding: [Function: socketSetEncoding],
      _paused: false,
      timeout: 0,
      [Symbol(async_id_symbol)]: 35,
      [Symbol(kHandle)]: null,
      [Symbol(lastWriteQueueSize)]: 0,
      [Symbol(timeout)]: null,
      [Symbol(kBuffer)]: null,
      [Symbol(kBufferCb)]: null,
      [Symbol(kBufferGen)]: null,
      [Symbol(shapeMode)]: true,
      [Symbol(kCapture)]: false,
      [Symbol(kSetNoDelay)]: true,
      [Symbol(kSetKeepAlive)]: false,
      [Symbol(kSetKeepAliveInitialDelay)]: 0,
      [Symbol(kBytesRead)]: 229,
      [Symbol(kBytesWritten)]: 131,
      [Symbol(websocket)]: undefined
    },
    _autoPong: true,
    _isServer: true,
    [Symbol(shapeMode)]: false,
    [Symbol(kCapture)]: false
  },
  [Symbol(kType)]: 'close',
  [Symbol(kCode)]: 1005,
  [Symbol(kReason)]: '',
  [Symbol(kWasClean)]: true
}

How can I fix this? Thanks.-

lropero avatar Aug 13 '24 15:08 lropero