alpakka icon indicating copy to clipboard operation
alpakka copied to clipboard

RSocket connector

Open patriknw opened this issue 9 years ago • 9 comments

RSocket is interesting, especially for integration between microservices (implemented with different technology stacks).

reactivesocket-java API is based on Reactive Streams Publisher and Subscriber and could therefore be used directly with Akka Streams.

However, it might be beneficial to provide a more idiomatic API on top of reactivesocket-java.

The reactivesocket-java API is: https://github.com/ReactiveSocket/reactivesocket-java/blob/0.5.x/reactivesocket-core/src/main/java/io/reactivesocket/ReactiveSocket.java (note that requestSubscription is being removed)

The same interface is used on both client and server side.

A Scala API wrapper might look like:

def fireAndForget(payload: Payload): Future[Done]
def requestResponse(payload: Payload): Future[Payload]
def requestStream(payload: Payload): Source[Payload]
def requestChannel(payloads: Source[Payload]): Source[Payload]

patriknw avatar Nov 09 '16 11:11 patriknw

So far, I'm not convinced that such a wrapper API would add much value compared to using the reactivesocket-java API directly with Akka Streams Publisher/Subscriber conversions. I'll dig deeper.

patriknw avatar Nov 09 '16 11:11 patriknw

After looking at it a first time I think that on the server side we could provide a nice SocketAcceptor integration, something closer to how the Tcp streams api looks server side perhaps, client side maybe not so much need.

johanandren avatar Nov 09 '16 14:11 johanandren

It seems that first attempts of implementing this were stopped. I am interested to provide this kind of alpakka implementation - do you have any new thoughts how should it look like or would be great to make some initial proposal by me?

maciekciolek avatar Feb 13 '17 22:02 maciekciolek

@maciekciolek That would be awesome. It would be great if you could start by sketching out the API and we can review that as the first step. Take a look at the prototype at https://github.com/akka/alpakka/pull/26/files

patriknw avatar Feb 14 '17 06:02 patriknw

@patriknw I went deeply into #26, #69 prototypes and the implementation of ReactiveSocket. I can see two ways how Alpakka can support ReactiveSocket:

  1. We can implement just Akka Streams wrapper (abstract class with method overrides) which will allow developers to use ReactiveSocket with Akka Streams easily but it will be just raw wrapper.
  2. We can do it like akka-http - I can imagine approach as follow:
object ReactiveSockets {

  def bindAndHandle(
                     handler: Flow[ReactiveRequest, ReactiveResponse, NotUsed],
                     transport: TransportServer
         )(implicit fm: Materializer) = ???
}

where ReactiveRequest is type wrapper over proper ReactiveSocket`s interaction model. ReactiveResponse is type representing proper response.

With this approach we can offer DSL for Payload (un-)marshallers and so one (like akka-http - the sky is the limit).

There is one problem, it will be hard to implement it with current ReactiveStream API, as it's AbstractReactiveSocket requires to return Publisher in place of method invocation. It will be needed to open API of io.reactivesocket.ServerReactiveSocket (PR to java implementation) and then implement this socket like Akka`s Tcp binding.

It seems that Alpakka is not the best place for such implementation and I can implement first approach, but maybe in future Akka will be interested in this solution? What is your opinion?

maciekciolek avatar Feb 20 '17 23:02 maciekciolek

I vote for alternative 1. My thinking around this is that we should embrace ReactiveSocket and not hide it too much. At some places we provide easier-to-use constructs/wrappers.

patriknw avatar Feb 21 '17 09:02 patriknw

@maciekciolek I checked this area a bit the last days and find it very interesting indeed. Are you still interested in making the contribution?

aisven avatar Jan 30 '19 13:01 aisven

The Lightbend blog about RSocket in Cloudflow shows how to bring Akka Streams and RSocket together: https://www.lightbend.com/blog/implementing-rsocket-ingress-in-cloudflow-part-2-advanced-features

ennru avatar Aug 10 '20 11:08 ennru

Can this effort leverage msocket ?

They use a portable-akka implementation that allows extending the akka programming model to the client (`scala.js included)

https://github.com/tmtsoftware/msocket/blob/master/example/example-service-api/src/main/scala/csw/example/api/ExampleApi.scala

Note: Source in the apis

trait ExampleApi {
  // these are requestResponse style APIs that are only supported by transports that have implicit timeouts
  def hello(name: String): Future[String]
  def randomBag(): Future[Bag]

  // this looks like requestResponse style API but is implemented on top of streaming API with explicit timeout
  // because that timeout could be much larger than the implicit timeout of the transport
  def square(number: Int): Future[Int]

  // these are requestStream style APIs
  def helloStream(name: String): Source[String, Subscription]
  def getNumbers(divisibleBy: Int): Source[Int, Subscription]
  def randomBagStream(): Source[Bag, Subscription]
}
/**
 * Transport agnostic message protocol is defined as a Scala ADT
 * For each API method, there needs to be a matching message in this ADT
 * For example, Hello(name) is the message for hello(name) method in the API
 */
object ExampleProtocol {
  // these messages are used for requestResponse interaction model
  sealed trait ExampleRequest
  case class Hello(name: String) extends ExampleRequest
  case object RandomBag          extends ExampleRequest

  // these messages are used for requestStream interaction model
  sealed trait ExampleStreamRequest
  case class HelloStream(name: String)    extends ExampleStreamRequest
  case class Square(number: Int)          extends ExampleStreamRequest
  case class GetNumbers(divisibleBy: Int) extends ExampleStreamRequest
  case object RandomBagStream             extends ExampleStreamRequest
}

SemanticBeeng avatar Sep 25 '21 12:09 SemanticBeeng