natchez icon indicating copy to clipboard operation
natchez copied to clipboard

Add method to run code with activated span for interop with Java libraries

Open mwisnicki opened this issue 6 months ago • 5 comments

With this method in place I can fix all my datadog java instrumentation issues by simply activating span on Sync.suspend by providing custom instance like so:

import cats.data.Kleisli
import cats.effect.kernel.Async
import cats.syntax.all.*
import natchez.*
import scala.concurrent.duration.FiniteDuration

object Tracing {
  given asyncForKleisli[F[_]: Async]: Async[Kleisli[F, Span[F], *]] = TracingAsyncKleisli(Async.asyncForKleisli)

  class TracingAsyncKleisli[F[_]](underlying: Async[Kleisli[F, Span[F], *]]) extends Async[Kleisli[F, Span[F], *]] {
    export underlying.{suspend as _, *}

    override def suspend[A](hint: Sync.Type)(thunk: => A): Kleisli[F, Span[F], A] = {
      // `suspend` is a demarcation point for all FFI from cats-effect to Java/impure code.
      // By wrapping thunk with span activation we ensure correct span for instrumentation of Java libs.
      Kleisli { span =>
        underlying.suspend(hint)(span.unsafeRunWithActivatedSpan(thunk)).run(span)
      }
    }

    // the export on this one does not work because of overloading
    override def sleep(time: FiniteDuration): Kleisli[F, Span[F], Unit] = underlying.sleep(time)
  }
}

And then when lifting resources to Kleisli:

import Tracing.asyncForKleisli
given LoggerFactory[Kleisli[F, Span[F], *]] = Slf4jFactory.create // when using log4cats
entrypoint.liftR(routesF)

I suppose I could add this to natchez but supporting scala 2 would require a lot of boilerplate code in natchez.

Ideally paired with https://github.com/typelevel/natchez/pull/1185.


For ioTrace something like this should work (with tagless final code) although I haven't tested it:

implicit object TracingAsyncIO extends Async[IO] {
    private val underlying = IO.asyncForIO
    export underlying.{suspend as _, *}

    private val currentSpan = IOLocal(Span.noop[IO])

    def withSpan[A](span: Span[IO])(io: IO[A]): IO[A] = Resource
      .make(currentSpan.flatMap(_.getAndSet(span)))(previousSpan => currentSpan.flatMap(_.set(previousSpan)))
      .use(_ => io)

    override def suspend[A](hint: Sync.Type)(thunk: => A): IO[A] =
      currentSpan.flatMap(_.get).flatMap(span => IO.suspend(hint)(span.unsafeRunWithActivatedSpan(thunk)))

    override def sleep(time: FiniteDuration): IO[Unit] = underlying.sleep(time)
  }

mwisnicki avatar Jun 29 '25 06:06 mwisnicki

for the record, I don't like the idea of a custom Async instance, but the low-level unsafe API addition sounds good to me.

kubukoz avatar Jun 30 '25 18:06 kubukoz

I'm not sure what you mean by a custom Async instance. Could something like this work?

def runWithActivatedSpan[T](run: F[T])(implicit F: Sync[F]): F[T]

(edit: maybe you weren't replying to my message, in which case, please disregard 🙂)

bpholt avatar Jun 30 '25 19:06 bpholt

@bpholt yeah I was referring to the first snippet in the PR's description :)

kubukoz avatar Jun 30 '25 19:06 kubukoz

for the record, I don't like the idea of a custom Async instance, but the low-level unsafe API addition sounds good to me.

That's why I didn't add it in this PR but I couldn't find a better way.

mwisnicki avatar Jun 30 '25 20:06 mwisnicki

Any chance of having this merged?

mwisnicki avatar Dec 01 '25 04:12 mwisnicki