Airstream icon indicating copy to clipboard operation
Airstream copied to clipboard

Added websocket event sources

Open ajaychandran opened this issue 4 years ago • 11 comments

I have tested the implementation for binary and text data. Included support for dom.Blob data type, not sure how to test it though.

This example can be added to the Laminar project once the PR is merged and released.

ajaychandran avatar Dec 30 '20 23:12 ajaychandran

Looks like I started with an older version of the target branch. Let me know if you want me to resubmit the PR.

ajaychandran avatar Dec 30 '20 23:12 ajaychandran

@raquo Please review.

ajaychandran avatar Jan 01 '21 09:01 ajaychandran

I have a WebSocket API in laminext, and I took a different approach there (I just added support for blob/arrays inspired by this PR :) ).

The usage looks like this:

      val ws = websocket.forUrl("wss://echo.websocket.org").string
// or
      websocket.forUrl("wss://echo.websocket.org").text(decode, encode)
      websocket.forUrl("wss://echo.websocket.org").blob
      websocket.forUrl("wss://echo.websocket.org").arraybuffer
// or      
      websocket.forUrl("wss://echo.websocket.org").receiveString.sendArray
      websocket.forUrl("wss://echo.websocket.org").receiveBlob.sendText(decode, encode)
// and all other combinations 

and then there's a bunch of binders + a stream and an observer provided:

simplest:

val outgoingEvents: Observable[String] = ???

div(
  ws.connect, //only needed when no other binders are used, like in this example
  child.text <-- ws.received.stream,
  outgoingEvents --> ws.sendObserver
)

other binders:

div(
  ws.send <-- outgoingEvents
)
div(
  ws.connected --> connectedObserver,  // or () => Unit, as usual
  ws.closed --> closedObserver,  
  ws.error --> errorObserver,  
  ws.received --> receivedObserver
)

It buffers the outgoing messages until connected (configurable). A couple of things remain to be done, like the re-connection logic.

The working example can look like this:

val ws = websocket.forUrl("wss://echo.websocket.org").string
val inputElement = input()
div(
  div(inputElement),
  div(  
    button(
      "send",
      onClick.mapTo(inputElement.ref.value) --> ws.sendObserver
    )
  ),
  ws.connect,
  div(text <-- ws.received)
)

yurique avatar Jan 02 '21 04:01 yurique

It's a quite old piece of code, though. Now I'm looking at it and wondering if I could ditch half of it (together with some extra complexity)

yurique avatar Jan 02 '21 04:01 yurique

after some cleanup my implementation looks like this: https://gist.github.com/yurique/baae21c9eea620033e0fc2cf5f370701

  • a couple of files with builders

yurique avatar Jan 02 '21 21:01 yurique

I reviewed this branch, Iurii's gist, and my own private websocket implementation (my own stuff is not very useful because it's too tied to my personal code and the libraries that I use), and after all, I don't think adding websockets to Airstream is the right thing to do now.

The amount of complexity required to make a reasonably safe, resilient and pleasant API is very significant, and I don't have the capacity right now to get to the bottom of it and make the right call. Things that need to be designed, at the very least, are:

  • handling errors and disconnects, and maybe automatic reconnects
  • queing of pending client-to-server messages while the the connection was not yet open or while it was disconnected
  • library needs to be aware that users will most likely use a codec for both inbound and outbound messages, and so should probably support such config

It's not clear to me how the API should be designed in terms of stream lifecycle either.

Iurii's implementation depends on Laminar but provides a seemingly more convenient API. I honestly don't know if that's the way to go or not. I think we'll need to see Iurii's laminext released, presumably with his websockets implementation, and see how that fares, what kind of feedback we get about such an approach to websockets.

Websockets are quite far from Airstream's core functionality, so I'm ok having examples / gists / third party libraries cover that for now.

At any rate, this turned out to be too big of an issue to include in v0.12.0. I'm going to fix a bunch more small stuff and wrap up that release now.

raquo avatar Jan 05 '21 03:01 raquo

This is a very exciting release!

I'll do my best to release laminext asap after 0.12 is out. Almost everything is prepared, just a couple of modules left to document.

yurique avatar Jan 05 '21 03:01 yurique

should this be included as an extension module of Airstream instead? Of course, post 0.12?

ngbinh avatar Jan 06 '21 07:01 ngbinh

Well, that's what Iurii's laminext project will be, right, an extension library for Laminar & Airstream. Ajay's implementation can be published similarly, it doesn't have to be part of Airstream core.

The benefit of having third parties publish these extensions is that I don't have to participate in the design or maintain the code afterwards, and you can have several alternatives on equal standing.

I mean eventually it would be nice to have a canonical websockets wrapper in Airstream itself, but it's a lot of work to do it the way I think it should be done (I didn't realize this initially, or I would have said it upfront), and the cost / benefit ratio isn't there yet. In my view issues like https://github.com/raquo/Airstream/issues/43 are more pressing, and that isn't an easy problem either, that will take a lot of dev time too.

— Best regards, Nikita

On Tue, Jan 5, 2021 at 11:14 PM Binh Nguyen [email protected] wrote:

should this be included as an extension module of Airstream instead? Of course, post 0.12?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/raquo/Airstream/pull/60#issuecomment-755129878, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAECBMHXAYLUJYQI4C7V63LSYQEWVANCNFSM4VOZ6KEQ .

raquo avatar Jan 06 '21 07:01 raquo

I have redesigned the API to support connection management.

// a duplex connection
// (Observer[Boolean], Observer[String[, EventStream[String])
val (control, receive, transmit) = WebSocketEventStream.text.open("absolute/url")
  • a connection is established when receive is started
  • sending true to control will open a new connection (provided receive is started)
  • sending false to control will close the connection
  • the connection is closed when receive is stopped
  • control can be used in conjunction with a socketErrorObserver (optional constructor parameter) to implement an automatic retry policy.

At any rate, this turned out to be too big of an issue to include in v0.12.0. I'm going to fix a bunch more small stuff and wrap up that release now.

I agree. It would be nice to get some feedback from developers using any of the implementations presented here.

ajaychandran avatar Jan 06 '21 10:01 ajaychandran