natchez icon indicating copy to clipboard operation
natchez copied to clipboard

http4s module

Open tbrown1979 opened this issue 6 years ago • 16 comments

Hey @tpolecat would you be open to a PR adding an http4s module for this project? Wrapping a Client, middleware for a server, maybe other things? If so, I'll try to get something created soonish

tbrown1979 avatar Jun 04 '19 03:06 tbrown1979

Hey! I'd definitely like to do this but I'm not quite sure what the end-user API is going to look like yet so things may be changing underneath you. If you're ok with that I would welcome an http4s module.

I'm curious, which back end are you going to use? I have been focusing on Honeycomb.

tpolecat avatar Jun 04 '19 03:06 tpolecat

@tpolecat I totally understand. To be honest, I had started something very similar to this project internally at my company but didn't have the time to fully flesh it out. I was essentially going to port the http4s stuff I had created over to this. It's not a lot of code so any changes shouldn't be too rough :)

I'd most likely be using Jaeger as my backend, which I see already has a module!

tbrown1979 avatar Jun 04 '19 05:06 tbrown1979

Ok I'll see if I can get a handle on the API this week. I'll open PRs for everything so we can discuss changes. Thanks for your interest!

tpolecat avatar Jun 04 '19 05:06 tpolecat

I'm very interested in this, mainly from the PoV that when using the Klieisli tracer there is no derived instance for ConcurrentEffect available, and I'm not sure I want to get into handcrafting one inside an EntryPoint closure.

Do either of you have a different method for injecting a Trace which will work when a ConcurrentEffect instance is required?

janstenpickle avatar Jul 09 '19 10:07 janstenpickle

I'm going to answer my own question here in case anyone is wondering about the same thing:

When constructing routes or a http app where F is a Kleisli, in this case Kleisli[F, Span[F], ?], it is possible to call translate and pass natural transformations from F to Kleisli[F, Span[F], ?] and from Kleisli[F, Span[F], ?] to F. See the http4s syntax here.

The natural transformations will have to be constructed inside the EntryPoint closure, but it will mean that the routes will be usable by a Http4s server, without having to create a type class instance of ConcurrentEffect for Kleisli[F, Span[F], ?].

Using a bastardised version of the example module, this is what this method would look like:

import org.http4s.syntax.all
...

val fk = new (F ~> Kleisli[F, Span[F], ?]) {
   def apply(fa: F[A]): Kleisli[F, Span[F], ?] = Kleisli.liftF(fa)
}

entryPoint[F].use { ep =>
  ep.root("root").use { span =>
    val gk = new (Kleisli[F, Span[F], ?] ~> F) {
      def apply(fa: Kleisli[F, Span[F], ?]): F[A] = fa.run(span)
    }

   val routes: HttpRoutes[Kleisli[F, Span[F], ?]] = ???
   val transformedRoutes: HttpRoutes[F] = routes.transform(gk)(fk)
  }
}

Once the API is stabilised I could create a proper example of this if people are interested.

janstenpickle avatar Jul 09 '19 15:07 janstenpickle

Hi @janstenpickle. I have a concern regarding your example. Root span is being created during the start of the application and then it passed to every request. As a result, only child route can be created inside a route handler. Thus in Jaeger will be only one big trace with tons of sub-traces.

I've implemented a middleware that creates a new span per request:

Example
import cats.data.{Kleisli, OptionT}
import cats.effect._
import cats.implicits._
import cats.~>
import io.jaegertracing.Configuration.{ReporterConfiguration, SamplerConfiguration}
import natchez.jaeger.Jaeger
import natchez.{EntryPoint, Span, TraceValue}
import org.http4s._
import org.http4s.dsl.Http4sDsl
import org.http4s.syntax.all._

object NatchezHttp4sExample extends IOApp {

  type Traced[F[_], A] = Kleisli[F, Span[F], A]

  class Api[F[_]: Sync] extends Http4sDsl[F] {

    def routes: Http[OptionT[Traced[F, ?], ?], F] = tracedRoutes {
      case GET -> Root / path =>
        Kleisli { span =>
          span.span("child-span").use { s =>
            s.put("path" -> TraceValue.StringValue(path)) >> Ok("It works")
          }
        }
    }

    def tracedRoutes(pf: PartialFunction[Request[F], Traced[F, Response[F]]]): Http[OptionT[Traced[F, ?], ?], F] =
      Kleisli { req =>
        OptionT(Sync[Traced[F, ?]].suspend(pf.lift(req).sequence))
      }
  }

  def withApiSpan[F[_]: Sync](ep: EntryPoint[F]): OptionT[Traced[F, ?], ?] ~> OptionT[F, ?] = {
    val gk: Traced[F, ?] ~> F = new (Traced[F, ?] ~> F) {
      def apply[A](fa: Kleisli[F, Span[F], A]): F[A] =
        ep.root("api").use(fa.run)
    }

    new (OptionT[Traced[F, ?], ?] ~> OptionT[F, ?]) {
      override def apply[A](fa: OptionT[Traced[F, ?], A]): OptionT[F, A] =
        fa.mapK(gk)
    }
  }

  override def run(args: List[String]): IO[ExitCode] =
    entryPoint[IO].use { ep =>
      val api    = new Api[IO]
      val routes = api.routes.mapK(withApiSpan[IO](ep)).orNotFound

      val requests = List(
        Request[IO](Method.GET, uri"/path1"),
        Request[IO](Method.GET, uri"/path2"),
        Request[IO](Method.GET, uri"/path3")
      )

      requests.traverse(routes.run).as(ExitCode.Success)

      //BlazeServerBuilder[IO].withHttpApp(routes).resource.use(_ => IO.never) // Real use case
    }

  def entryPoint[F[_]: Sync]: Resource[F, EntryPoint[F]] =
    Jaeger.entryPoint[F]("http4s-example") { c =>
      Sync[F].delay {
        c.withSampler(SamplerConfiguration.fromEnv)
          .withReporter(ReporterConfiguration.fromEnv)
          .getTracer
      }
    }
  
}
Jaeger dashboard

image

Personally, I don't like this approach for several reasons:

  1. natchez.Trace typeclass is ignored and all child spans are being created directly through the Span[F];
  2. Ugly routes syntax;

In another project, I've been using a different solution based on ApplicativeHandle from Cats MTL. I've described span as an ADT and it was a part of the effect.

Example
import cats.effect._
import cats.implicits._
import cats.mtl._
import cats.mtl.implicits._
import io.{opentracing => ot}
import natchez.{Kernel, TraceValue}

sealed trait Span
object Span {
  final case object Empty extends Span
  final case class Defined(tracer: ot.Tracer, otSpan: ot.Span) extends Span {
    def kernel[F[_]: Sync]: F[Kernel]                           = ???
    def put[F[_]: Sync](fields: (String, TraceValue)*): F[Unit] = ???
    def span[F[_]: Sync](name: String): Resource[F, Defined]    = ???
  }
}

trait EntryPoint[F[_]] {
  def root(name: String): Resource[F, Span.Defined]
  def continue(name: String, kernel: Kernel): Resource[F, Span.Defined]
}

trait Tracer[F[_]] {
  def current: F[Span]
  def rootSpan[A](name: String)(fa: F[A]): F[A]
  def childSpan[A](name: String)(fa: F[A]): F[A]
  def put(fields: (String, TraceValue)*): F[Unit]
}

object Tracer {
  def apply[F[_]](implicit ev: Tracer[F]): Tracer[F] = ev

  def create[F[_]: Sync: ApplicativeLocal[?[_], Span]](ep: EntryPoint[F]): Tracer[F] =
    new Tracer[F] {
      override def current: F[Span] = ApplicativeLocal[F, Span].ask

      override def rootSpan[A](name: String)(fa: F[A]): F[A] =
        ep.root(name).use(context => fa.scope(context: Span))

      override def childSpan[A](name: String)(fa: F[A]): F[A] =
        current.flatMap {
          case Span.Empty      => fa
          case s: Span.Defined => s.span(name).use(context => fa.scope(context: Span))
        }

      override def put(fields: (String, TraceValue)*): F[Unit] =
        current.flatMap {
          case Span.Empty      => Sync[F].unit
          case s: Span.Defined => s.put[F](fields: _*)
        }

    }
}

class Service[F[_]: Sync: ApplicativeLocal[?[_], Span]: Tracer] {
  def foo(): F[Unit] =
    Tracer[F].rootSpan("rootSpan") {
      Tracer[F].put("key" -> "value") >> Sync[F].delay(println("It works"))
    }
}

object NatchezCatsMtl extends IOApp {
  
  override def run(args: List[String]): IO[ExitCode] = {
    type Effect[A] = Kleisli[IO, Span, A]

    entryPoint[Effect]
      .use { ep =>
        implicit val tracer: Tracer[Effect] = Tracer.create[Effect](ep)
        val service                         = new Service[Kleisli[IO, Span, ?]]

        service.foo()
      }
      .run(Span.Empty)
      .as(ExitCode.Success)
  }

  def entryPoint[F[_]]: Resource[F, EntryPoint[F]] = ???
}

This one much more flexible and can be easily integrated with Http4s. Is there is another simple way to create a root span inside the application?

WDYT @tpolecat ?

iRevive avatar Aug 29 '19 12:08 iRevive

Thanks for your comments, I will try to have a look soon but I'm super busy today.

tpolecat avatar Aug 29 '19 13:08 tpolecat

Thanks @tpolecat, no rush!

@iRevive you're quite right, thanks for the correction! I actually did a similar thing to your first example but I forgot to update this issue. I really like your MTL approach!

janstenpickle avatar Aug 29 '19 13:08 janstenpickle

@janstenpickle thanks! The obvious downside of the MTL approach that it requires an additional dependency. I figured out that Trace can be described without ApplicativeHandle.

Trace as Kleisli
object Trace {
  def apply[F[_]](implicit ev: Trace[F]): Trace[F] = ev

  def fromKleisli[F[_]: Sync](ep: EntryPoint[F]): Trace[Kleisli[F, Span, ?]] =
    new Trace[Kleisli[F, Span, ?]] {
      type Eff[A] = Kleisli[F, Span, A]

      override def current: Eff[Span] = Kleisli.ask

      override def rootSpan[A](name: String)(fa: Eff[A]): Eff[A] =
        ep.root(name).mapK(Kleisli.liftK[F, Span]).use[A](context => Kleisli.local({_: Span => context: Span})(fa))

      override def childSpan[A](name: String)(fa: Eff[A]): Eff[A] =
        current.flatMap {
          case Span.Empty      =>
            fa

          case s: Span.Defined =>
            s.span(name).mapK(Kleisli.liftK[F, Span]).use[A](context => Kleisli.local({_: Span => context: Span})(fa))
        }

      override def put(fields: (String, TraceValue)*): Eff[Unit] =
        current.flatMap {
          case Span.Empty      => Sync[Eff].unit
          case s: Span.Defined => s.put[Eff](fields: _*)
        }

    }
}

object NatchezCatsMtl extends IOApp {

  override def run(args: List[String]): IO[ExitCode] = {
    type Effect[A] = Kleisli[IO, Span, A]

    entryPoint[IO]
      .use { ep =>
        implicit val trace: Trace[Effect] = Trace.fromKleisli[IO](ep)
        val service                       = new Service[Kleisli[IO, Span, ?]]

        service.foo().run(Span.Empty)
      }
      .as(ExitCode.Success)
  }

  def entryPoint[F[_]]: Resource[F, EntryPoint[F]] = ???
}

This implementation has two differences from the natchez library:

  1. Trace can create a root span;
  2. Span represented as ADT: Empty and Defined.

The downside of ADT is that you cannot guarantee at the compile time that root span is already created. For example, Trace[F].childSpan("child")(fa) will not have any effect if root span wasn't created at the bottom of the call stack.

iRevive avatar Aug 30 '19 09:08 iRevive

Here is what I came up with. This lets you write your routes with a Trace constraint and then lift them into your normal F. The interesting bit is in def app where we say ep.liftT to lift our Trace-demanding routes into F, which doesn't have a Trace instance. This works by inferring Kleisli[F, Span[F], ?] as the type argument when we call routes. Slightly sneaky.

import cats.effect._
import cats.implicits._
import io.jaegertracing.Configuration._
import natchez._
import natchez.http4s.implicits._
import natchez.jaeger.Jaeger
import org.http4s._
import org.http4s.dsl.Http4sDsl
import org.http4s.implicits._
import org.http4s.server._
import org.http4s.server.blaze.BlazeServerBuilder

object Http4sExample extends IOApp {

  // This is what we want to write: routes in F[_]: ...: Trace
  def routes[F[_]: Sync: Trace]: HttpRoutes[F] = {
    object dsl extends Http4sDsl[F]; import dsl._
    HttpRoutes.of[F] {
      case GET -> Root / "hello" / name =>
        Trace[F].put("woot" -> 42) *>
        Trace[F].span("responding") {
          Ok(s"Hello, $name.")
        }
    }
  }

  // Normal constructor for an HttpApp in F *without* a Trace constraint.
  def app[F[_]: Sync: Bracket[?[_], Throwable]](ep: EntryPoint[F]): HttpApp[F] =
    Router("/" -> ep.liftT(routes)).orNotFound // <-- Lifted routes

  // Normal server resource
  def server[F[_]: ConcurrentEffect: Timer](routes: HttpApp[F]): Resource[F, Server[F]] =
    BlazeServerBuilder[F]
      .bindHttp(8080, "localhost")
      .withHttpApp(routes)
      .resource

  // Normal EntryPoint resource
  def entryPoint[F[_]: Sync]: Resource[F, EntryPoint[F]] =
    Jaeger.entryPoint[F]("natchez-example") { c =>
      Sync[F].delay {
        c.withSampler(SamplerConfiguration.fromEnv)
          .withReporter(ReporterConfiguration.fromEnv)
          .getTracer
      }
    }

  // Main method instantiates F to IO
  def run(args: List[String]): IO[ExitCode] =
    entryPoint[IO].map(app(_)).flatMap(server(_)).use(_ => IO.never).as(ExitCode.Success)

}

The implementation is a little janky, may be able to simplify. But it works.

package natchez.http4s

import cats.~>
import cats.data.{ Kleisli, OptionT }
import cats.effect.Bracket
import natchez.{ EntryPoint, Kernel, Span }
import org.http4s.HttpRoutes

object implicits {

  // Given an entry point and HTTP Routes in Kleisli[F, Span[F], ?] return routes in F. A new span
  // is created with the URI path as the name, either as a continuation of the incoming trace, if
  // any, or as a new root. This can likely be simplified, I just did what the types were saying
  // and it works so :shrug:
  private def liftT[F[_]: Bracket[?[_], Throwable]](
    entryPoint: EntryPoint[F])(
    routes:     HttpRoutes[Kleisli[F, Span[F], ?]]
  ): HttpRoutes[F] =
    Kleisli { req =>
      type G[A]  = Kleisli[F, Span[F], A]
      val lift   = λ[F ~> G](fa => Kleisli(_ => fa))
      val kernel = Kernel(req.headers.toList.map(h => (h.name.value -> h.value)).toMap)
      val spanR  = entryPoint.continueOrElseRoot(req.uri.path, kernel)
      OptionT {
        spanR.use { span =>
          val lower = λ[G ~> F](_(span))
          routes.run(req.mapK(lift)).mapK(lower).map(_.mapK(lower)).value
        }
      }
    }

  implicit class EntryPointOps[F[_]](self: EntryPoint[F]) {
    def liftT(routes: HttpRoutes[Kleisli[F, Span[F], ?]])(
      implicit ev: Bracket[F, Throwable]
    ): HttpRoutes[F] =
      implicits.liftT(self)(routes)
  }

}

Hitting the endpoint yields a trace like

image

I think this provides exactly what I want. WDYT?

tpolecat avatar Aug 31 '19 21:08 tpolecat

@tpolecat LGTM.
This approach covers all my use cases.

One little thing. I would like to have a bit more control over headers passed to Kernel: val kernel = Kernel(req.headers.toList.map(h => (h.name.value -> h.value)).toMap). req.headers can expose an authentication credentials/api token/etc.

The Logger middleware from Http4s provides a way to redact headers:

def httpRoutes[F[_]: Concurrent](
      logHeaders: Boolean,
      logBody: Boolean,
      redactHeadersWhen: CaseInsensitiveString => Boolean = Headers.SensitiveHeaders.contains,
      logAction: Option[String => F[Unit]] = None
  )(httpRoutes: HttpRoutes[F]): HttpRoutes[F] = 

Is this applicable here?

iRevive avatar Sep 01 '19 11:09 iRevive

@tpolecat how would you handle situation when routes depends on ConcurrentEffect since there is no Effect for Kleisli?

pshemass avatar Jan 25 '20 03:01 pshemass

You're out of luck in that case, although you could pass the span explicitly.

ConcurrentEffect is a ridiculously tight constraint so hopefully it's not needed much. One known exception is the http4s Blaze client. If you're in this situation you might try the Ember client.

tpolecat avatar Jan 30 '20 04:01 tpolecat

Blaze client is my case actually :) other place that I found static files/webjar service in Htt4s.

I will try to pass Span explicitly for the time being.

pshemass avatar Jan 30 '20 06:01 pshemass

@tpolecat @pshemass I'm facing the Effect problem for a service dependency that doesn't even need tracing. Could you give an example of providing the span explicitly?

soujiro32167 avatar May 05 '21 13:05 soujiro32167

Ended up with this:

  def kleisliConcurrentEffect[F[_]](
    root: Span[F]
  )(implicit F: Concurrent[Kleisli[F, Span[F], *]], FF: ConcurrentEffect[F]): ConcurrentEffect[Kleisli[F, Span[F], *]] =
    new ConcurrentEffect[Kleisli[F, Span[F], *]] {

      override def runCancelable[A](fa: Kleisli[F, Span[F], A])(cb: Either[Throwable, A] => IO[Unit]): SyncIO[Kleisli[F, Span[F], Unit]] =
        FF.runCancelable(fa.run(root))(cb).map(Kleisli.liftF)

      override def runAsync[A](fa: Kleisli[F, Span[F], A])(cb: Either[Throwable, A] => IO[Unit]): SyncIO[Unit] =
         FF.runAsync(fa.run(root))(cb)

// All other implementation from Concurrent[Kleisli[F, Span[F], *]]
}

implicit val a = kleisliConcurrentEffect[IO](NoopSpan[IO]()) // since this is for a dependency that doesn't need tracing

soujiro32167 avatar May 05 '21 14:05 soujiro32167