opentelemetry-java-instrumentation
opentelemetry-java-instrumentation copied to clipboard
Akka/Pekko after loses thread context propagation
Describe the bug
The after pattern causes the instrumentation to get confused. This is related to the execution context provided. To make it work, manual wrapping must be used.
Steps to reproduce
https://github.com/wsargent/akka-after-loses-otel-thread-context
package org.example.application
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span
import io.opentelemetry.context.Context
import org.apache.pekko.Done
import org.apache.pekko.actor.ActorSystem
import org.slf4j.LoggerFactory
import scala.language.reflectiveCalls
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext, Future}
object Main {
private val logger = LoggerFactory.getLogger(getClass)
private val openTelemetry = GlobalOpenTelemetry.get()
private val tracer = openTelemetry.getTracer("example")
// https://tersesystems.com/blog/2024/06/20/executioncontext.parasitic-and-friends/
private val opportunisticExecutionContext = (scala.concurrent.ExecutionContext: {def opportunistic: scala.concurrent.ExecutionContextExecutor}).opportunistic
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem("example")
implicit val ec = ExecutionContext.global
val f = for {
_ <- operation("wrapping")
_ <- operation("global")
_ <- operation("parasitic")
_ <- operation("opportunistic")
_ <- operation("dispatcher")
} yield actorSystem.terminate()
Await.result(f, 30.seconds)
}
def operation(mode: String)(implicit actorSystem: ActorSystem): Future[Done] = {
traceSync(s"root $mode") {
val expectedSpan = Span.current()
logger.info(s"mode: We expect ${expectedSpan}")
val afterExecutionContext = defineExecutionContext(mode)
org.apache.pekko.pattern.after(1.second, actorSystem.scheduler) {
val actualSpan = Span.current()
Future.successful {
if (!expectedSpan.equals(actualSpan)) {
logger.error(s"$mode: Unexpected $actualSpan")
} else {
logger.info(s"$mode: Reached delayed with $actualSpan")
}
Done
}
}(afterExecutionContext)
}
}
def traceSync[A](traceName: String)(block: => A): A = {
val span = tracer.spanBuilder(traceName).startSpan()
assert(span.isRecording, "No-op span, you must run this class with the java agent so it instruments correctly!")
try {
val scope = span.makeCurrent()
try {
block
} finally {
scope.close()
}
} finally {
span.end()
}
}
def defineExecutionContext(mode: String)(implicit system: ActorSystem): ExecutionContext = {
val dispatcher = system.classicSystem.dispatcher
mode match {
case "wrapping" =>
val context = Context.current()
new ExecutionContext {
override def execute(runnable: Runnable): Unit = dispatcher.execute(context.wrap(runnable))
override def reportFailure(cause: Throwable): Unit = dispatcher.reportFailure(cause)
}
case "global" =>
ExecutionContext.global
case "parasitic" =>
ExecutionContext.parasitic
case "opportunistic" =>
opportunisticExecutionContext
case _ =>
dispatcher
}
}
}
Expected behavior
The after pattern carries over the existing span context.
Actual behavior
100.123.234.53 ❱ ./bin/akka-after-loses-thread-context
OpenJDK 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended
129 INFO trace_id= span_id= [main] io.opentelemetry.javaagent.tooling.VersionLogger - opentelemetry-javaagent - version: 2.4.0
678 INFO trace_id=8746ecc707c7eb642df61a1933b396e9 span_id=1a8c4711d7702a04 [main] org.example.application.Main$ - mode: We expect ApplicationSpan{agentSpan=SdkSpan{traceId=8746ecc707c7eb642df61a1933b396e9, spanId=1a8c4711d7702a04, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, name=root wrapping, kind=INTERNAL, attributes=AttributesMap{data={thread.name=main, thread.id=1}, capacity=128, totalAddedValues=2}, status=ImmutableStatusData{statusCode=UNSET, description=}, totalRecordedEvents=0, totalRecordedLinks=0, startEpochNanos=1720292847324051943, endEpochNanos=0}}
680 INFO trace_id= span_id= [main] io.opentelemetry.exporter.logging.LoggingSpanExporter - 'root wrapping' : 8746ecc707c7eb642df61a1933b396e9 1a8c4711d7702a04 INTERNAL [tracer: example:] AttributesMap{data={thread.name=main, thread.id=1}, capacity=128, totalAddedValues=2}
1711 INFO trace_id=8746ecc707c7eb642df61a1933b396e9 span_id=1a8c4711d7702a04 [example-pekko.actor.default-dispatcher-5] org.example.application.Main$ - wrapping: Reached delayed with ApplicationSpan{agentSpan=SdkSpan{traceId=8746ecc707c7eb642df61a1933b396e9, spanId=1a8c4711d7702a04, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, name=root wrapping, kind=INTERNAL, attributes=AttributesMap{data={thread.name=main, thread.id=1}, capacity=128, totalAddedValues=2}, status=ImmutableStatusData{statusCode=UNSET, description=}, totalRecordedEvents=0, totalRecordedLinks=0, startEpochNanos=1720292847324051943, endEpochNanos=1720292847338923884}}
1715 INFO trace_id=8746ecc707c7eb642df61a1933b396e9 span_id=7246937d638eba1a [scala-execution-context-global-26] org.example.application.Main$ - mode: We expect ApplicationSpan{agentSpan=SdkSpan{traceId=8746ecc707c7eb642df61a1933b396e9, spanId=7246937d638eba1a, parentSpanContext=ImmutableSpanContext{traceId=8746ecc707c7eb642df61a1933b396e9, spanId=1a8c4711d7702a04, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, name=root global, kind=INTERNAL, attributes=AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}, status=ImmutableStatusData{statusCode=UNSET, description=}, totalRecordedEvents=0, totalRecordedLinks=0, startEpochNanos=1720292848374104426, endEpochNanos=0}}
1715 INFO trace_id=8746ecc707c7eb642df61a1933b396e9 span_id=1a8c4711d7702a04 [scala-execution-context-global-26] io.opentelemetry.exporter.logging.LoggingSpanExporter - 'root global' : 8746ecc707c7eb642df61a1933b396e9 7246937d638eba1a INTERNAL [tracer: example:] AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}
2730 ERROR trace_id= span_id= [scala-execution-context-global-26] org.example.application.Main$ - global: Unexpected PropagatedSpan{ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}}
2731 INFO trace_id=4a46e96a6cd3bb1fe64335281a6bb229 span_id=3f967afae96d6627 [scala-execution-context-global-26] org.example.application.Main$ - mode: We expect ApplicationSpan{agentSpan=SdkSpan{traceId=4a46e96a6cd3bb1fe64335281a6bb229, spanId=3f967afae96d6627, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, name=root parasitic, kind=INTERNAL, attributes=AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}, status=ImmutableStatusData{statusCode=UNSET, description=}, totalRecordedEvents=0, totalRecordedLinks=0, startEpochNanos=1720292849389997315, endEpochNanos=0}}
2731 INFO trace_id= span_id= [scala-execution-context-global-26] io.opentelemetry.exporter.logging.LoggingSpanExporter - 'root parasitic' : 4a46e96a6cd3bb1fe64335281a6bb229 3f967afae96d6627 INTERNAL [tracer: example:] AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}
3747 ERROR trace_id= span_id= [example-scheduler-1] org.example.application.Main$ - parasitic: Unexpected PropagatedSpan{ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}}
3748 INFO trace_id=f0c1bc85c8635a598f51244982df7750 span_id=5e10398ab14c17ee [scala-execution-context-global-26] org.example.application.Main$ - mode: We expect ApplicationSpan{agentSpan=SdkSpan{traceId=f0c1bc85c8635a598f51244982df7750, spanId=5e10398ab14c17ee, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, name=root opportunistic, kind=INTERNAL, attributes=AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}, status=ImmutableStatusData{statusCode=UNSET, description=}, totalRecordedEvents=0, totalRecordedLinks=0, startEpochNanos=1720292850407256495, endEpochNanos=0}}
3748 INFO trace_id= span_id= [scala-execution-context-global-26] io.opentelemetry.exporter.logging.LoggingSpanExporter - 'root opportunistic' : f0c1bc85c8635a598f51244982df7750 5e10398ab14c17ee INTERNAL [tracer: example:] AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}
4767 ERROR trace_id= span_id= [scala-execution-context-global-26] org.example.application.Main$ - opportunistic: Unexpected PropagatedSpan{ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}}
4768 INFO trace_id=077ce825f5698598d8ca24c433f812f5 span_id=0b8912756c432c1d [scala-execution-context-global-26] org.example.application.Main$ - mode: We expect ApplicationSpan{agentSpan=SdkSpan{traceId=077ce825f5698598d8ca24c433f812f5, spanId=0b8912756c432c1d, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, name=root dispatcher, kind=INTERNAL, attributes=AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}, status=ImmutableStatusData{statusCode=UNSET, description=}, totalRecordedEvents=0, totalRecordedLinks=0, startEpochNanos=1720292851427164157, endEpochNanos=0}}
4768 INFO trace_id= span_id= [scala-execution-context-global-26] io.opentelemetry.exporter.logging.LoggingSpanExporter - 'root dispatcher' : 077ce825f5698598d8ca24c433f812f5 0b8912756c432c1d INTERNAL [tracer: example:] AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}
5788 ERROR trace_id= span_id= [example-pekko.actor.default-dispatcher-5] org.example.application.Main$ - dispatcher: Unexpected PropagatedSpan{ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}}
[INFO] [07/06/2024 12:07:32.469] [scala-execution-context-global-26] [CoordinatedShutdown(pekko://example)] Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
Javaagent or library instrumentation version
1.39.0
Environment
JDK:
openjdk version "17.0.7" 2023-04-18 OpenJDK Runtime Environment Temurin-17.0.7+7 (build 17.0.7+7) OpenJDK 64-Bit Server VM Temurin-17.0.7+7 (build 17.0.7+7, mixed mode, sharing)
OS:
Linux devserver 6.5.0-41-generic #41~22.04.2-Ubuntu SMP PREEMPT_DYNAMIC Mon Jun 3 11:32:55 UTC 2 x86_64 x86_64 x86_64 GNU/Linux
Additional context
https://github.com/wsargent/opentelemetry-with-scala-futures
I think this is because the scheduler.scheduleOnce method creates a new Runnable and passes the function in:
final def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit)(
implicit
executor: ExecutionContext): Cancellable =
schedule(initialDelay, interval, new Runnable { override def run(): Unit = f })
https://github.com/apache/pekko/blob/main/actor/src/main/scala/org/apache/pekko/actor/Scheduler.scala#L389
I think I am having an issue with the after call as well. Randomly, our trace_id s disappear, I suspect some of those after calls cause it.
To make it work, manual wrapping must be used.
Can you elaborate how you manage to do this with after?
@Dogacel checkout wsargent/opentelemetry-with-scala-futures
@Dogacel checkout wsargent/opentelemetry-with-scala-futures
Thank you.
I wonder if it is possible to fix this with an update to the auto-instrumentation library? It is hard to catch usages of all those scheduler actions in code.
Side note, we might need to instrument all schedule and scheduleOnce calls as well as CircuitBreaker calls.
https://github.com/search?q=repo%3Aapache%2Fpekko%20%22new%20Runnable%22&type=code
I am pretty new to OTEL instrumentation so I don't have a deep understanding in how we might want to do it but I can try once I have some spare time.
For reference, I have created a unit test in this commit:
https://github.com/dogacel/opentelemetry-java-instrumentation/commit/b6160e70e1d9f8ffe3a25f6bbad882c9a847d64d
I am not entirely sure how I should create the instrumentation feature.
Resolved with https://github.com/open-telemetry/opentelemetry-java-instrumentation/pull/12359 and https://github.com/open-telemetry/opentelemetry-java-instrumentation/pull/12373