spring-framework icon indicating copy to clipboard operation
spring-framework copied to clipboard

Support Flux in, Flux out for WebSocketSession in WebFlux

Open bruto0 opened this issue 5 years ago • 2 comments

WebSocketClient as of Webflux 5.2.7 offers only 2 execute methods which confine programmer's logic to WebSocketHandler implementation he passes there. However, it would've been useful in some cases to expose the received Flux to the outside scope in order to enable its composition with other WS sessions (or reactive functions in general) If such usage pattern is a first-class API method in RSocket, then it's probably been requested often enough

example implementation: https://stackoverflow.com/questions/62249016/sending-output-of-one-websocket-client-as-input-to-another

bruto0 avatar Jul 03 '20 10:07 bruto0

any kind of feedback would've been nice @rstoyanchev ?

bruto0 avatar Feb 02 '21 16:02 bruto0

Yes, it is a reasonable request and probably feasible but something to explore.

rstoyanchev avatar Feb 04 '21 18:02 rstoyanchev

It would be very useful, indeed.

I will try to publish a minimal example in the next few days, because I think I've got a use-case that fits that requirement: I would like a server to proxies data from an internal service websocket to a client app through HTTP and Server Sent Events. I have not found any simple way to "bind" the ingoing websocket session.receive() Flux to the outgoing SSE data stream.

I currently use a Sinks.Many to transmit data received on websocket to the SSE. It complexifies not only data pipeline, but also network connection (stream lifecycle) management, as now, the websocket session and SSE have two independent subscriptions glued together using a Sink. This is far from ideal.

One idea would be for the WebSocketHandler to return Publisher<V> instead of Mono<Void>, so the developper is free to transform the session receive / send pipeline and send back any result as websocket session data stream.

alexismanin avatar Jan 28 '25 11:01 alexismanin

I've published a minimal reproducible example in this repository. It is an example of my use-case where I connect to a websocket server and send received message to a SSE client.

The project also contains a possible way of improvement by modeling a custom WebSocketHandler interface that returns any arbitrary stream. A WebSocketClient extension can then return a Flux of user values instead of a Mono<Void>, while still binding websocket session lifecycle to the user flux subscription.

That' not necessarily the best option, but I hope it to serve as a base of discussion/experimentation.

alexismanin avatar Feb 04 '25 21:02 alexismanin