fs2-grpc icon indicating copy to clipboard operation
fs2-grpc copied to clipboard

668 aspect oriented middleware

Open ValdemarGr opened this issue 1 year ago • 16 comments

Initial draft for #668

Example output for TestServiceFs2Grpc
package hello.world

import _root_.cats.syntax.all._

trait TestServiceFs2Grpc[F[_], A] {
  def noStreaming(request: hello.world.TestMessage, ctx: A): F[hello.world.TestMessage]
  def clientStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): F[hello.world.TestMessage]
  def serverStreaming(request: hello.world.TestMessage, ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage]
  def bothStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage]
}

object TestServiceFs2Grpc extends _root_.fs2.grpc.GeneratedCompanion[TestServiceFs2Grpc] {
  
  def mkClientFull[F[_]: _root_.cats.effect.Async, Dom[_], Cod[_], A](
    dispatcher: _root_.cats.effect.std.Dispatcher[F],
    channel: _root_.io.grpc.Channel,
    clientAspect: _root_.fs2.grpc.client.ClientAspect[F, Dom, Cod, A],
    clientOptions: _root_.fs2.grpc.client.ClientOptions
  )(implicit
    dom0: Dom[hello.world.TestMessage],
    cod0: Cod[hello.world.TestMessage]
  ): TestServiceFs2Grpc[F, A] = new TestServiceFs2Grpc[F, A] {
    def noStreaming(request: hello.world.TestMessage, ctx: A): F[hello.world.TestMessage] =
      clientAspect.visitUnaryToUnary[hello.world.TestMessage, hello.world.TestMessage](
        _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_NO_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
        request,
        (req, m) => _root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_NO_STREAMING, dispatcher, clientOptions).flatMap(_.unaryToUnaryCall(req, m))
      )
    def clientStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): F[hello.world.TestMessage] =
      clientAspect.visitStreamingToUnary[hello.world.TestMessage, hello.world.TestMessage](
        _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
        request,
        (req, m) => _root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, dispatcher, clientOptions).flatMap(_.streamingToUnaryCall(req, m))
      )
    def serverStreaming(request: hello.world.TestMessage, ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] =
      clientAspect.visitUnaryToStreaming[hello.world.TestMessage, hello.world.TestMessage](
        _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
        request,
        (req, m) => _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, dispatcher, clientOptions)).flatMap(_.unaryToStreamingCall(req, m))
      )
    def bothStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] =
      clientAspect.visitStreamingToStreaming[hello.world.TestMessage, hello.world.TestMessage](
        _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
        request,
        (req, m) => _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, dispatcher, clientOptions)).flatMap(_.streamingToStreamingCall(req, m))
      )
  }
  
  def mkClientTrivial[F[_]: _root_.cats.effect.Async, A](
    dispatcher: _root_.cats.effect.std.Dispatcher[F],
    channel: _root_.io.grpc.Channel,
    clientAspect: _root_.fs2.grpc.client.ClientAspect[F, _root_.fs2.grpc.shared.Trivial, _root_.fs2.grpc.shared.Trivial, A],
    clientOptions: _root_.fs2.grpc.client.ClientOptions
  ) = 
    mkClientFull[F, _root_.fs2.grpc.shared.Trivial, _root_.fs2.grpc.shared.Trivial, A](
      dispatcher,
      channel,
      clientAspect,
      clientOptions
    )
  
  protected def serviceBindingFull[F[_]: _root_.cats.effect.Async, Dom[_], Cod[_], A](
    dispatcher: _root_.cats.effect.std.Dispatcher[F],
    serviceImpl: TestServiceFs2Grpc[F, A],
    serviceAspect: _root_.fs2.grpc.server.ServiceAspect[F, Dom, Cod, A],
    serverOptions: _root_.fs2.grpc.server.ServerOptions
  )(implicit
    dom0: Dom[hello.world.TestMessage],
    cod0: Cod[hello.world.TestMessage]
  ) = {
    _root_.io.grpc.ServerServiceDefinition
      .builder(hello.world.TestServiceGrpc.SERVICE)
      .addMethod(
        hello.world.TestServiceGrpc.METHOD_NO_STREAMING,
        _root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).unaryToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => 
          serviceAspect.visitUnaryToUnary[hello.world.TestMessage, hello.world.TestMessage](
            _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_NO_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
            r,
            (r, m) => serviceImpl.noStreaming(r, m)
          )
        }
      )
      .addMethod(
        hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING,
        _root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).streamingToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => 
          serviceAspect.visitStreamingToUnary[hello.world.TestMessage, hello.world.TestMessage](
            _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
            r,
            (r, m) => serviceImpl.clientStreaming(r, m)
          )
        }
      )
      .addMethod(
        hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING,
        _root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).unaryToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => 
          serviceAspect.visitUnaryToStreaming[hello.world.TestMessage, hello.world.TestMessage](
            _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
            r,
            (r, m) => serviceImpl.serverStreaming(r, m)
          )
        }
      )
      .addMethod(
        hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING,
        _root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).streamingToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => 
          serviceAspect.visitStreamingToStreaming[hello.world.TestMessage, hello.world.TestMessage](
            _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
            r,
            (r, m) => serviceImpl.bothStreaming(r, m)
          )
        }
      )
      .build()
  }
  
  protected def serviceBindingTrivial[F[_]: _root_.cats.effect.Async, A](
    dispatcher: _root_.cats.effect.std.Dispatcher[F],
    serviceImpl: TestServiceFs2Grpc[F, A],
    serviceAspect: _root_.fs2.grpc.server.ServiceAspect[F, _root_.fs2.grpc.shared.Trivial, _root_.fs2.grpc.shared.Trivial, A],
    serverOptions: _root_.fs2.grpc.server.ServerOptions
  ) = 
    serviceBindingFull[F, _root_.fs2.grpc.shared.Trivial, _root_.fs2.grpc.shared.Trivial, A](
      dispatcher,
      serviceImpl,
      serviceAspect,
      serverOptions
    )

}

ValdemarGr avatar Sep 07 '23 13:09 ValdemarGr

@ValdemarGr I think this looks promising. Should mkClientFull and mkServiceFull be on GeneratedCompanion and trivial implemented in GeneratedCompanion in terms of that?

ahjohannessen avatar Sep 13 '23 10:09 ahjohannessen

@ValdemarGr I think this looks promising. Should mkClientFull and mkServiceFull be on GeneratedCompanion and trivial implemented in GeneratedCompanion in terms of that?

I don't think it is possible to declare a generic implementation of mkClientFull and mkServiceFull on GeneratedCompanion, since you need to request Dom and Cod typeclasses for each request/response, which varies by the specific grpc service.

Update

I think it can be achieved by using an abstract type and some typeclass derivation gymnastics. The solution becomes messy however.

//GeneratedCompanion.scala
trait GeneratedCompanion[Service[*[_], _]] {
  type Doms[Dom[_]]
  type Cods[Cod[_]]

  implicit final def serviceCompanion: GeneratedCompanion[Service] = this
...
}

// TestServiceFs2Grpc.scala
object TestServiceFs2Grpc {
  ...
  case class Doms[Dom[_]](
    dom0: Dom[hello.world.TestMessage]
  )
  
  object Doms {
    implicit def typeclassInstance[Dom[_]](implicit 
      dom0: Dom[hello.world.TestMessage]
    ): Doms[Dom] = Doms(
      dom0
    )
  }
  
  case class Cods[Cod[_]](
    cod0: Cod[hello.world.TestMessage]
  )
  
  object Cods {
    implicit def typeclassInstance[Cod[_]](implicit 
      cod0: Cod[hello.world.TestMessage]
    ): Cods[Cod] = Cods(
      cod0
    )
  }
}

Alternatively we could relax the solution by removing Dom and Cod, then we could restore the *Full methods in GeneratedCompanion?

From my observation, users usually parse the protobuf structures into more idiomatic scala datatypes so maybe the Dom and Cod typeclass usecase is not of that much value?

ValdemarGr avatar Sep 13 '23 13:09 ValdemarGr

I have also introduced a G to both server aspect and client aspect to facilitate running the service in another effect than the one that the dispatcher is defined in.

A usecase for this is for instance passing authorization

type UserId = String

type Auth[F[_]] = cats.mtl.Ask[F, UserId]

type Authed[A] = Kleisli[IO, UserId, A]

...

trait MyService[F[_]] {
  def myRequest(req: MyRequest, ctx: Metadata): F[MyResponse]
}

val myServiceAspect = new ServiceAspect[Authed, IO, Trivial, Trivial, Metadata] {
  def visitUnaryToUnary[Req, Res](
    callCtx: ServerCallContext[Req, Res, Dom, Cod],
    req: Req,
    next: (Req, Metadata) => Authed[Res]
  ): IO[Res] =
    extractAuth(callCtx.metadata).flatMap(auth => next(req, callCtx.metadata).run(auth))
}

def makeMyService[F[_]](implicit auth: Auth[F]) = new MyService[F] {
  def myRequest(req: MyRequest, ctx: Metadata): F[MyResponse] =
    auth.ask[UserId].flatMap(userId => handleThings(userId, req))
}

Dispatcher[IO].use{ d =>
  MyService.serviceTrivial[Authed, IO, Metadata](
    d,
    makeMyService[Authed],
    myServiceAspect,
    ServerOption.default
  )
}

ValdemarGr avatar Sep 13 '23 14:09 ValdemarGr

I have also explored a solution that removed the Dom and Cod typeclasses in search of a simpler solution that still solves the issues regarding tracing and auth. What are your thoughts?

Example output for TestServiceFs2Grpc
package hello.world

import _root_.cats.syntax.all._

trait TestServiceFs2Grpc[F[_], A] {
  def noStreaming(request: hello.world.TestMessage, ctx: A): F[hello.world.TestMessage]
  def clientStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): F[hello.world.TestMessage]
  def serverStreaming(request: hello.world.TestMessage, ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage]
  def bothStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage]
}

object TestServiceFs2Grpc extends _root_.fs2.grpc.GeneratedCompanion[TestServiceFs2Grpc] {
  
  def mkClientFull[F[_], G[_]: _root_.cats.effect.Async, A](
    dispatcher: _root_.cats.effect.std.Dispatcher[G],
    channel: _root_.io.grpc.Channel,
    clientAspect: _root_.fs2.grpc.client.ClientAspect[F, G, A],
    clientOptions: _root_.fs2.grpc.client.ClientOptions
  ): TestServiceFs2Grpc[F, A] = new TestServiceFs2Grpc[F, A] {
    def noStreaming(request: hello.world.TestMessage, ctx: A): F[hello.world.TestMessage] =
      clientAspect.visitUnaryToUnary[hello.world.TestMessage, hello.world.TestMessage](
        _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_NO_STREAMING),
        request,
        (req, m) => _root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_NO_STREAMING, dispatcher, clientOptions).flatMap(_.unaryToUnaryCall(req, m))
      )
    def clientStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): F[hello.world.TestMessage] =
      clientAspect.visitStreamingToUnary[hello.world.TestMessage, hello.world.TestMessage](
        _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING),
        request,
        (req, m) => _root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, dispatcher, clientOptions).flatMap(_.streamingToUnaryCall(req, m))
      )
    def serverStreaming(request: hello.world.TestMessage, ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] =
      clientAspect.visitUnaryToStreaming[hello.world.TestMessage, hello.world.TestMessage](
        _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING),
        request,
        (req, m) => _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, dispatcher, clientOptions)).flatMap(_.unaryToStreamingCall(req, m))
      )
    def bothStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] =
      clientAspect.visitStreamingToStreaming[hello.world.TestMessage, hello.world.TestMessage](
        _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING),
        request,
        (req, m) => _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, dispatcher, clientOptions)).flatMap(_.streamingToStreamingCall(req, m))
      )
  }
  
  protected def serviceBindingFull[F[_], G[_]: _root_.cats.effect.Async, A](
    dispatcher: _root_.cats.effect.std.Dispatcher[G],
    serviceImpl: TestServiceFs2Grpc[F, A],
    serviceAspect: _root_.fs2.grpc.server.ServiceAspect[F, G, A],
    serverOptions: _root_.fs2.grpc.server.ServerOptions
  ) = {
    _root_.io.grpc.ServerServiceDefinition
      .builder(hello.world.TestServiceGrpc.SERVICE)
      .addMethod(
        hello.world.TestServiceGrpc.METHOD_NO_STREAMING,
        _root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).unaryToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => 
          serviceAspect.visitUnaryToUnary[hello.world.TestMessage, hello.world.TestMessage](
            _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_NO_STREAMING),
            r,
            (r, m) => serviceImpl.noStreaming(r, m)
          )
        }
      )
      .addMethod(
        hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING,
        _root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).streamingToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => 
          serviceAspect.visitStreamingToUnary[hello.world.TestMessage, hello.world.TestMessage](
            _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING),
            r,
            (r, m) => serviceImpl.clientStreaming(r, m)
          )
        }
      )
      .addMethod(
        hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING,
        _root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).unaryToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => 
          serviceAspect.visitUnaryToStreaming[hello.world.TestMessage, hello.world.TestMessage](
            _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING),
            r,
            (r, m) => serviceImpl.serverStreaming(r, m)
          )
        }
      )
      .addMethod(
        hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING,
        _root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).streamingToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => 
          serviceAspect.visitStreamingToStreaming[hello.world.TestMessage, hello.world.TestMessage](
            _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING),
            r,
            (r, m) => serviceImpl.bothStreaming(r, m)
          )
        }
      )
      .build()
  }
}

ValdemarGr avatar Sep 15 '23 13:09 ValdemarGr

From my observation, users usually parse the protobuf structures into more idiomatic scala datatypes so maybe the Dom and Cod typeclass usecase is not of that much value?

In our codebases we often convert protobuf structures into more idiomatic scala datatypes, either by hand or have typemappers in ScalaPB for request/response payload types.

What does the removal of Dom and Cod impact on missing features?

I think your latest changes are less intrusive and if they solve the issue with regards to tracing and auth, then it is perhaps better with this simpler solution?

ahjohannessen avatar Sep 15 '23 14:09 ahjohannessen

What does the removal of Dom and Cod impact on missing features?

I don't think any features that people need (from scanning the related issues) are impacted by removing these.

I think your latest changes are less intrusive and if they solve the issue with regards to tracing and auth, then it is perhaps better with this simpler solution?

Yes let's move forward with this. I'll make sure the tests are green and construct tests that use the new features to implement tracing and auth.

ValdemarGr avatar Sep 15 '23 14:09 ValdemarGr

Yes let's move forward with this. I'll make sure the tests are green and construct tests that use the new features to implement tracing and auth.

Sound like a plan 👍

ahjohannessen avatar Sep 15 '23 14:09 ahjohannessen

@ValdemarGr Perhaps getting more eyes from typelevel on the PR would be a good idea?

ahjohannessen avatar Sep 19 '23 08:09 ahjohannessen

@ValdemarGr Perhaps getting more eyes from typelevel on the PR would be a good idea?

Yes that would be awesome.

ValdemarGr avatar Sep 19 '23 12:09 ValdemarGr

@rossabaker @armanbilge @fiadliel What do you guys think about this change? :)

ahjohannessen avatar Sep 19 '23 13:09 ahjohannessen

quite excited about this change, hoping it will make reusable otel4s integration easy 🙏

lacarvalho91 avatar Oct 09 '23 21:10 lacarvalho91

Is there any update on this PR's progress? Is something blocking it or does it need reviewing?

bcarter97 avatar Jan 03 '24 13:01 bcarter97

Is there any update on this PR's progress? Is something blocking it or does it need reviewing?

I think the last event in this PR was tagging some typelevel maintainers for reviewing the code.

ValdemarGr avatar Jan 03 '24 13:01 ValdemarGr

@ahjohannessen is there anyone else that can be asked to review this? Would be great to progress this

lacarvalho91 avatar Feb 13 '24 19:02 lacarvalho91

I haven't used gRPC in a long time, so I don't have strong opinions here.

rossabaker avatar Feb 13 '24 19:02 rossabaker

Has there been any progress on this? This functionality would be very useful for observe-ability functionality.

MattLangsenkamp avatar May 18 '24 20:05 MattLangsenkamp