Kamon icon indicating copy to clipboard operation
Kamon copied to clipboard

Kamon with akka streams looses tags and token

Open maciejbak85 opened this issue 7 months ago • 3 comments

While working with akka streams and kamon, where I propagate context I realized that when using Source from akka streams in looses tags and token, here is PoC code:

package repro

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import kamon.Kamon
import org.slf4j.LoggerFactory

import scala.concurrent.Future

object Main extends App {
  Kamon.init()

  private val log = LoggerFactory.getLogger(getClass)

  implicit val system: ActorSystem = ActorSystem("Repro")
  implicit val mat: Materializer = Materializer(system)
  implicit val ec = system.dispatcher

  def withFakeSecurityContext[T](body: => T): T = {
    val ctx = Kamon
      .currentContext()
      .withTag("user.login", "[email protected]")
      .withTag("tenant", "qwerty")
      .withTag("auth.token", "fake-jwt-signature")

    Kamon.runWithContext(ctx)(body)
  }

  val stream =
    Source(1 to 1)
      .map { i =>
        log.info(
          s"[map-1] ctx ${Kamon.currentContext()}"
        )
        i
      }
      .async // <-- explicit async boundary
      .map { i =>
        log.info(
          s"[map-2] ctx ${Kamon.currentContext()}"
        )
        i
      }
      .runWith(Sink.ignore)

  withFakeSecurityContext {
    log.info(s"[before] ctx ${Kamon.currentContext()}")
    stream
  }

  withFakeSecurityContext {
    println(s"[Future1] ctx ${Kamon.currentContext()}")
    Future {
      log.info(s"[Future2] ctx ${Kamon.currentContext()}")
    }
    log.info(s"[Future3] ctx ${Kamon.currentContext()}")
  }

  val graph =
    Source(1 to 1)
      .map(i => { log.info(s"[map-1-v2] ctx ${Kamon.currentContext()}"); i })
      .async
      .map(i => { log.info(s"[map-2-v2] ctx ${Kamon.currentContext()}"); i })

  val stream2 =
    withFakeSecurityContext {
      log.info(s"[before2-v2] ctx ${Kamon.currentContext()}")
      graph.runWith(Sink.ignore) // ← materialise here, tags & span present
    }

  stream.onComplete(_ => {
    system.terminate()
    Kamon.stop()
  })
}


build.sbt

ThisBuild / scalaVersion := "2.13.15"

lazy val root = (project in file("."))
  .settings(
    name := "kamon-context-loss-repro",
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-actor" % "2.6.21",
      "com.typesafe.akka" %% "akka-stream" % "2.6.21",
      "io.kamon" %% "kamon-bundle" % "2.7.7",
      "ch.qos.logback" % "logback-classic" % "1.3.15"
    )
  )

and log output:

Initializing Kamon Telemetry v2.7.7 / Kanela v1.0.18

13:19:57.186 [main] INFO repro.Main$ -- [before] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,[email protected],auth.token=fake-jwt-signature}}
[Future1] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,[email protected],auth.token=fake-jwt-signature}}
13:19:57.190 [main] INFO repro.Main$ -- [Future3] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,[email protected],auth.token=fake-jwt-signature}}
13:19:57.191 [Repro-akka.actor.default-dispatcher-5] INFO repro.Main$ -- [Future2] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,[email protected],auth.token=fake-jwt-signature}}
13:19:57.191 [main] INFO repro.Main$ -- [before2-v2] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,[email protected],auth.token=fake-jwt-signature}}
13:19:57.192 [Repro-akka.actor.default-dispatcher-7] INFO repro.Main$ -- [map-1] ctx Context{Entries{span=Span.Empty},Tags{}}
13:19:57.192 [Repro-akka.actor.default-dispatcher-8] INFO repro.Main$ -- [map-1-v2] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,[email protected],auth.token=fake-jwt-signature}}
13:19:57.192 [Repro-akka.actor.default-dispatcher-6] INFO repro.Main$ -- [map-2] ctx Context{Entries{span=Span.Empty},Tags{}}
13:19:57.192 [Repro-akka.actor.default-dispatcher-5] INFO repro.Main$ -- [map-2-v2] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,[email protected],auth.token=fake-jwt-signature}}

maciejbak85 avatar Jul 10 '25 11:07 maciejbak85

Does it happen with 2.8.0-beta.1 also?

mkurz avatar Jul 10 '25 11:07 mkurz

@mkurz yes

Initializing Kamon Telemetry v2.8.0-beta.1 / Kanela v2.0.0-beta.3

10:17:48.273 [main] INFO  repro.Main$ - [before] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,[email protected],auth.token=fake-jwt-signature}}
[Future1] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,[email protected],auth.token=fake-jwt-signature}}
10:17:48.278 [Repro-akka.actor.default-dispatcher-6] INFO  repro.Main$ - [map-1] ctx Context{Entries{span=Span.Empty},Tags{}}
10:17:48.278 [main] INFO  repro.Main$ - [Future3] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,[email protected],auth.token=fake-jwt-signature}}
10:17:48.278 [Repro-akka.actor.default-dispatcher-5] INFO  repro.Main$ - [Future2] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,[email protected],auth.token=fake-jwt-signature}}
10:17:48.278 [Repro-akka.actor.default-dispatcher-7] INFO  repro.Main$ - [map-2] ctx Context{Entries{span=Span.Empty},Tags{}}
10:17:48.278 [main] INFO  repro.Main$ - [before2-v2] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,[email protected],auth.token=fake-jwt-signature}}
10:17:48.279 [Repro-akka.actor.default-dispatcher-8] INFO  repro.Main$ - [map-1-v2] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,[email protected],auth.token=fake-jwt-signature}}
10:17:48.280 [Repro-akka.actor.default-dispatcher-7] INFO  repro.Main$ - [map-2-v2] ctx Context{Entries{span=Span.Empty},Tags{tenant=qwerty,[email protected],auth.token=fake-jwt-signature}}
[INFO] [07/11/2025 10:17:48.290] [Repro-akka.actor.default-dispatcher-9] [CoordinatedShutdown(akka://Repro)] Running CoordinatedShutdown with reason [ActorSystemTerminateReason]

maciejbak85 avatar Jul 11 '25 08:07 maciejbak85

btw. on my prod code in place where I do Source.futureSource { someStuff } I needed to add there Kamon.runWithContext(kamonCtx)(..)

ie:

          Kamon.runWithContext(kamonCtx) {
myCode

then it works of course. At least some workaround for now.

maciejbak85 avatar Jul 11 '25 10:07 maciejbak85