Add method to run code with activated span for interop with Java libraries
With this method in place I can fix all my datadog java instrumentation issues by simply activating span on Sync.suspend by providing custom instance like so:
import cats.data.Kleisli
import cats.effect.kernel.Async
import cats.syntax.all.*
import natchez.*
import scala.concurrent.duration.FiniteDuration
object Tracing {
given asyncForKleisli[F[_]: Async]: Async[Kleisli[F, Span[F], *]] = TracingAsyncKleisli(Async.asyncForKleisli)
class TracingAsyncKleisli[F[_]](underlying: Async[Kleisli[F, Span[F], *]]) extends Async[Kleisli[F, Span[F], *]] {
export underlying.{suspend as _, *}
override def suspend[A](hint: Sync.Type)(thunk: => A): Kleisli[F, Span[F], A] = {
// `suspend` is a demarcation point for all FFI from cats-effect to Java/impure code.
// By wrapping thunk with span activation we ensure correct span for instrumentation of Java libs.
Kleisli { span =>
underlying.suspend(hint)(span.unsafeRunWithActivatedSpan(thunk)).run(span)
}
}
// the export on this one does not work because of overloading
override def sleep(time: FiniteDuration): Kleisli[F, Span[F], Unit] = underlying.sleep(time)
}
}
And then when lifting resources to Kleisli:
import Tracing.asyncForKleisli
given LoggerFactory[Kleisli[F, Span[F], *]] = Slf4jFactory.create // when using log4cats
entrypoint.liftR(routesF)
I suppose I could add this to natchez but supporting scala 2 would require a lot of boilerplate code in natchez.
Ideally paired with https://github.com/typelevel/natchez/pull/1185.
For ioTrace something like this should work (with tagless final code) although I haven't tested it:
implicit object TracingAsyncIO extends Async[IO] {
private val underlying = IO.asyncForIO
export underlying.{suspend as _, *}
private val currentSpan = IOLocal(Span.noop[IO])
def withSpan[A](span: Span[IO])(io: IO[A]): IO[A] = Resource
.make(currentSpan.flatMap(_.getAndSet(span)))(previousSpan => currentSpan.flatMap(_.set(previousSpan)))
.use(_ => io)
override def suspend[A](hint: Sync.Type)(thunk: => A): IO[A] =
currentSpan.flatMap(_.get).flatMap(span => IO.suspend(hint)(span.unsafeRunWithActivatedSpan(thunk)))
override def sleep(time: FiniteDuration): IO[Unit] = underlying.sleep(time)
}
for the record, I don't like the idea of a custom Async instance, but the low-level unsafe API addition sounds good to me.
I'm not sure what you mean by a custom Async instance. Could something like this work?
def runWithActivatedSpan[T](run: F[T])(implicit F: Sync[F]): F[T]
(edit: maybe you weren't replying to my message, in which case, please disregard 🙂)
@bpholt yeah I was referring to the first snippet in the PR's description :)
for the record, I don't like the idea of a custom
Asyncinstance, but the low-level unsafe API addition sounds good to me.
That's why I didn't add it in this PR but I couldn't find a better way.
Any chance of having this merged?