fs2-grpc
fs2-grpc copied to clipboard
668 aspect oriented middleware
Initial draft for #668
Example output for TestServiceFs2Grpc
package hello.world
import _root_.cats.syntax.all._
trait TestServiceFs2Grpc[F[_], A] {
def noStreaming(request: hello.world.TestMessage, ctx: A): F[hello.world.TestMessage]
def clientStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): F[hello.world.TestMessage]
def serverStreaming(request: hello.world.TestMessage, ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage]
def bothStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage]
}
object TestServiceFs2Grpc extends _root_.fs2.grpc.GeneratedCompanion[TestServiceFs2Grpc] {
def mkClientFull[F[_]: _root_.cats.effect.Async, Dom[_], Cod[_], A](
dispatcher: _root_.cats.effect.std.Dispatcher[F],
channel: _root_.io.grpc.Channel,
clientAspect: _root_.fs2.grpc.client.ClientAspect[F, Dom, Cod, A],
clientOptions: _root_.fs2.grpc.client.ClientOptions
)(implicit
dom0: Dom[hello.world.TestMessage],
cod0: Cod[hello.world.TestMessage]
): TestServiceFs2Grpc[F, A] = new TestServiceFs2Grpc[F, A] {
def noStreaming(request: hello.world.TestMessage, ctx: A): F[hello.world.TestMessage] =
clientAspect.visitUnaryToUnary[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_NO_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
request,
(req, m) => _root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_NO_STREAMING, dispatcher, clientOptions).flatMap(_.unaryToUnaryCall(req, m))
)
def clientStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): F[hello.world.TestMessage] =
clientAspect.visitStreamingToUnary[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
request,
(req, m) => _root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, dispatcher, clientOptions).flatMap(_.streamingToUnaryCall(req, m))
)
def serverStreaming(request: hello.world.TestMessage, ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] =
clientAspect.visitUnaryToStreaming[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
request,
(req, m) => _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, dispatcher, clientOptions)).flatMap(_.unaryToStreamingCall(req, m))
)
def bothStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] =
clientAspect.visitStreamingToStreaming[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
request,
(req, m) => _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, dispatcher, clientOptions)).flatMap(_.streamingToStreamingCall(req, m))
)
}
def mkClientTrivial[F[_]: _root_.cats.effect.Async, A](
dispatcher: _root_.cats.effect.std.Dispatcher[F],
channel: _root_.io.grpc.Channel,
clientAspect: _root_.fs2.grpc.client.ClientAspect[F, _root_.fs2.grpc.shared.Trivial, _root_.fs2.grpc.shared.Trivial, A],
clientOptions: _root_.fs2.grpc.client.ClientOptions
) =
mkClientFull[F, _root_.fs2.grpc.shared.Trivial, _root_.fs2.grpc.shared.Trivial, A](
dispatcher,
channel,
clientAspect,
clientOptions
)
protected def serviceBindingFull[F[_]: _root_.cats.effect.Async, Dom[_], Cod[_], A](
dispatcher: _root_.cats.effect.std.Dispatcher[F],
serviceImpl: TestServiceFs2Grpc[F, A],
serviceAspect: _root_.fs2.grpc.server.ServiceAspect[F, Dom, Cod, A],
serverOptions: _root_.fs2.grpc.server.ServerOptions
)(implicit
dom0: Dom[hello.world.TestMessage],
cod0: Cod[hello.world.TestMessage]
) = {
_root_.io.grpc.ServerServiceDefinition
.builder(hello.world.TestServiceGrpc.SERVICE)
.addMethod(
hello.world.TestServiceGrpc.METHOD_NO_STREAMING,
_root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).unaryToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) =>
serviceAspect.visitUnaryToUnary[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_NO_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
r,
(r, m) => serviceImpl.noStreaming(r, m)
)
}
)
.addMethod(
hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING,
_root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).streamingToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) =>
serviceAspect.visitStreamingToUnary[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
r,
(r, m) => serviceImpl.clientStreaming(r, m)
)
}
)
.addMethod(
hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING,
_root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).unaryToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) =>
serviceAspect.visitUnaryToStreaming[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
r,
(r, m) => serviceImpl.serverStreaming(r, m)
)
}
)
.addMethod(
hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING,
_root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).streamingToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) =>
serviceAspect.visitStreamingToStreaming[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
r,
(r, m) => serviceImpl.bothStreaming(r, m)
)
}
)
.build()
}
protected def serviceBindingTrivial[F[_]: _root_.cats.effect.Async, A](
dispatcher: _root_.cats.effect.std.Dispatcher[F],
serviceImpl: TestServiceFs2Grpc[F, A],
serviceAspect: _root_.fs2.grpc.server.ServiceAspect[F, _root_.fs2.grpc.shared.Trivial, _root_.fs2.grpc.shared.Trivial, A],
serverOptions: _root_.fs2.grpc.server.ServerOptions
) =
serviceBindingFull[F, _root_.fs2.grpc.shared.Trivial, _root_.fs2.grpc.shared.Trivial, A](
dispatcher,
serviceImpl,
serviceAspect,
serverOptions
)
}
@ValdemarGr I think this looks promising. Should mkClientFull
and mkServiceFull
be on GeneratedCompanion
and trivial implemented in GeneratedCompanion
in terms of that?
@ValdemarGr I think this looks promising. Should
mkClientFull
andmkServiceFull
be onGeneratedCompanion
and trivial implemented inGeneratedCompanion
in terms of that?
I don't think it is possible to declare a generic implementation of mkClientFull
and mkServiceFull
on GeneratedCompanion
, since you need to request Dom
and Cod
typeclasses for each request/response, which varies by the specific grpc service.
Update
I think it can be achieved by using an abstract type and some typeclass derivation gymnastics. The solution becomes messy however.
//GeneratedCompanion.scala
trait GeneratedCompanion[Service[*[_], _]] {
type Doms[Dom[_]]
type Cods[Cod[_]]
implicit final def serviceCompanion: GeneratedCompanion[Service] = this
...
}
// TestServiceFs2Grpc.scala
object TestServiceFs2Grpc {
...
case class Doms[Dom[_]](
dom0: Dom[hello.world.TestMessage]
)
object Doms {
implicit def typeclassInstance[Dom[_]](implicit
dom0: Dom[hello.world.TestMessage]
): Doms[Dom] = Doms(
dom0
)
}
case class Cods[Cod[_]](
cod0: Cod[hello.world.TestMessage]
)
object Cods {
implicit def typeclassInstance[Cod[_]](implicit
cod0: Cod[hello.world.TestMessage]
): Cods[Cod] = Cods(
cod0
)
}
}
Alternatively we could relax the solution by removing Dom
and Cod
, then we could restore the *Full
methods in GeneratedCompanion
?
From my observation, users usually parse the protobuf structures into more idiomatic scala datatypes so maybe the Dom
and Cod
typeclass usecase is not of that much value?
I have also introduced a G
to both server aspect and client aspect to facilitate running the service in another effect than the one that the dispatcher is defined in.
A usecase for this is for instance passing authorization
type UserId = String
type Auth[F[_]] = cats.mtl.Ask[F, UserId]
type Authed[A] = Kleisli[IO, UserId, A]
...
trait MyService[F[_]] {
def myRequest(req: MyRequest, ctx: Metadata): F[MyResponse]
}
val myServiceAspect = new ServiceAspect[Authed, IO, Trivial, Trivial, Metadata] {
def visitUnaryToUnary[Req, Res](
callCtx: ServerCallContext[Req, Res, Dom, Cod],
req: Req,
next: (Req, Metadata) => Authed[Res]
): IO[Res] =
extractAuth(callCtx.metadata).flatMap(auth => next(req, callCtx.metadata).run(auth))
}
def makeMyService[F[_]](implicit auth: Auth[F]) = new MyService[F] {
def myRequest(req: MyRequest, ctx: Metadata): F[MyResponse] =
auth.ask[UserId].flatMap(userId => handleThings(userId, req))
}
Dispatcher[IO].use{ d =>
MyService.serviceTrivial[Authed, IO, Metadata](
d,
makeMyService[Authed],
myServiceAspect,
ServerOption.default
)
}
I have also explored a solution that removed the Dom
and Cod
typeclasses in search of a simpler solution that still solves the issues regarding tracing and auth. What are your thoughts?
Example output for TestServiceFs2Grpc
package hello.world
import _root_.cats.syntax.all._
trait TestServiceFs2Grpc[F[_], A] {
def noStreaming(request: hello.world.TestMessage, ctx: A): F[hello.world.TestMessage]
def clientStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): F[hello.world.TestMessage]
def serverStreaming(request: hello.world.TestMessage, ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage]
def bothStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage]
}
object TestServiceFs2Grpc extends _root_.fs2.grpc.GeneratedCompanion[TestServiceFs2Grpc] {
def mkClientFull[F[_], G[_]: _root_.cats.effect.Async, A](
dispatcher: _root_.cats.effect.std.Dispatcher[G],
channel: _root_.io.grpc.Channel,
clientAspect: _root_.fs2.grpc.client.ClientAspect[F, G, A],
clientOptions: _root_.fs2.grpc.client.ClientOptions
): TestServiceFs2Grpc[F, A] = new TestServiceFs2Grpc[F, A] {
def noStreaming(request: hello.world.TestMessage, ctx: A): F[hello.world.TestMessage] =
clientAspect.visitUnaryToUnary[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_NO_STREAMING),
request,
(req, m) => _root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_NO_STREAMING, dispatcher, clientOptions).flatMap(_.unaryToUnaryCall(req, m))
)
def clientStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): F[hello.world.TestMessage] =
clientAspect.visitStreamingToUnary[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING),
request,
(req, m) => _root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, dispatcher, clientOptions).flatMap(_.streamingToUnaryCall(req, m))
)
def serverStreaming(request: hello.world.TestMessage, ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] =
clientAspect.visitUnaryToStreaming[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING),
request,
(req, m) => _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, dispatcher, clientOptions)).flatMap(_.unaryToStreamingCall(req, m))
)
def bothStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] =
clientAspect.visitStreamingToStreaming[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING),
request,
(req, m) => _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, dispatcher, clientOptions)).flatMap(_.streamingToStreamingCall(req, m))
)
}
protected def serviceBindingFull[F[_], G[_]: _root_.cats.effect.Async, A](
dispatcher: _root_.cats.effect.std.Dispatcher[G],
serviceImpl: TestServiceFs2Grpc[F, A],
serviceAspect: _root_.fs2.grpc.server.ServiceAspect[F, G, A],
serverOptions: _root_.fs2.grpc.server.ServerOptions
) = {
_root_.io.grpc.ServerServiceDefinition
.builder(hello.world.TestServiceGrpc.SERVICE)
.addMethod(
hello.world.TestServiceGrpc.METHOD_NO_STREAMING,
_root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).unaryToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) =>
serviceAspect.visitUnaryToUnary[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_NO_STREAMING),
r,
(r, m) => serviceImpl.noStreaming(r, m)
)
}
)
.addMethod(
hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING,
_root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).streamingToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) =>
serviceAspect.visitStreamingToUnary[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING),
r,
(r, m) => serviceImpl.clientStreaming(r, m)
)
}
)
.addMethod(
hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING,
_root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).unaryToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) =>
serviceAspect.visitUnaryToStreaming[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING),
r,
(r, m) => serviceImpl.serverStreaming(r, m)
)
}
)
.addMethod(
hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING,
_root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).streamingToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) =>
serviceAspect.visitStreamingToStreaming[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING),
r,
(r, m) => serviceImpl.bothStreaming(r, m)
)
}
)
.build()
}
}
From my observation, users usually parse the protobuf structures into more idiomatic scala datatypes so maybe the Dom and Cod typeclass usecase is not of that much value?
In our codebases we often convert protobuf structures into more idiomatic scala datatypes, either by hand or have typemappers in ScalaPB for request/response payload types.
What does the removal of Dom
and Cod
impact on missing features?
I think your latest changes are less intrusive and if they solve the issue with regards to tracing and auth, then it is perhaps better with this simpler solution?
What does the removal of
Dom
andCod
impact on missing features?
I don't think any features that people need (from scanning the related issues) are impacted by removing these.
I think your latest changes are less intrusive and if they solve the issue with regards to tracing and auth, then it is perhaps better with this simpler solution?
Yes let's move forward with this. I'll make sure the tests are green and construct tests that use the new features to implement tracing and auth.
Yes let's move forward with this. I'll make sure the tests are green and construct tests that use the new features to implement tracing and auth.
Sound like a plan 👍
@ValdemarGr Perhaps getting more eyes from typelevel on the PR would be a good idea?
@ValdemarGr Perhaps getting more eyes from typelevel on the PR would be a good idea?
Yes that would be awesome.
@rossabaker @armanbilge @fiadliel What do you guys think about this change? :)
quite excited about this change, hoping it will make reusable otel4s
integration easy 🙏
Is there any update on this PR's progress? Is something blocking it or does it need reviewing?
Is there any update on this PR's progress? Is something blocking it or does it need reviewing?
I think the last event in this PR was tagging some typelevel maintainers for reviewing the code.
@ahjohannessen is there anyone else that can be asked to review this? Would be great to progress this
I haven't used gRPC in a long time, so I don't have strong opinions here.
Has there been any progress on this? This functionality would be very useful for observe-ability functionality.