dd-trace-java
dd-trace-java copied to clipboard
Kotlin coroutines support
Hi. I can see there is closed issue already: https://github.com/DataDog/dd-trace-java/issues/652 But it looks like agent is still having issues with coroutines support. See code/logs below:
import datadog.opentracing.DDSpan
import io.opentracing.util.GlobalTracer
import kotlinx.coroutines.*
import org.slf4j.LoggerFactory
object Main {
val logger = LoggerFactory.getLogger(Main.javaClass)
@JvmStatic
fun main(args: Array<String>) = runBlocking {
val tracer = GlobalTracer.get()
tracer.buildSpan("main").startActive(true)
val span = tracer.activeSpan()
logger.info("${Thread.currentThread().name} $span")
coroutineScope {
repeat(10) {
launch(Dispatchers.IO) {
waitFor(1000)
}
}
}
}
suspend fun waitFor(millis: Long) {
val tracer = GlobalTracer.get()
val span = tracer.activeSpan()
logger.info("${Thread.currentThread().name} $span")
delay(millis)
}
}
> Task :run
[dd.tracing.agent - 2019-07-26 12:06:40:165 +0200] [main] INFO datadog.trace.agent.ot.DDTraceOTInfo - dd-trace - version: 0.31.0~e9872762
[dd.tracing.agent - 2019-07-26 12:06:40:320 +0200] [main] INFO datadog.trace.agent.ot.DDTracer - New instance: DDTracer-64a9d48c{ serviceName=unnamed-java-app, writer=DDAgentWriter { api=DDApi { tracesUrl=http://localhost:8126/v0.3/traces } }, sampler=datadog.trace.agent.common.sampling.RateByServiceSampler@365a6a43, defaultSpanTags={}}
[dd.tracing.agent - 2019-07-26 12:06:40:333 +0200] [main] INFO datadog.trace.agent.tooling.VersionLogger - dd-trace-ot - version: 0.31.0~e9872762
[dd.tracing.agent - 2019-07-26 12:06:40:333 +0200] [main] INFO datadog.trace.agent.tooling.VersionLogger - dd-trace-api - version: 0.31.0~e9872762
[dd.tracing.agent - 2019-07-26 12:06:40:333 +0200] [main] INFO datadog.trace.agent.tooling.VersionLogger - dd-java-agent - version: 0.31.0~e9872762
[dd.tracing.agent - 2019-07-26 12:06:40:348 +0200] [main] INFO datadog.trace.agent.jmxfetch.JMXFetch - JMXFetch config: null [] [] [] null null {runtime-id=f6a7f3e0-beef-42bb-ae40-901125a3fe94, language=jvm, service=unnamed-java-app} statsd:localhost:8125 System.err INFO
2019-07-26 12:06:41.718 INFO 4149 [ main] Main$main$1 : main DDSpan [ t_id=4386340172421044579, s_id=2509487107151721448, p_id=0] trace=unnamed-java-app/main/main metrics={_sampling_priority_v1=1} tags={language=jvm, runtime-id=f6a7f3e0-beef-42bb-ae40-901125a3fe94, thread.id=1, thread.name=main}, duration_ns=0
2019-07-26 12:06:41.752 INFO 4149 [atcher-worker-1] Main : DefaultDispatcher-worker-1 null
2019-07-26 12:06:41.763 INFO 4149 [atcher-worker-3] Main : DefaultDispatcher-worker-3 null
2019-07-26 12:06:41.763 INFO 4149 [atcher-worker-2] Main : DefaultDispatcher-worker-2 null
2019-07-26 12:06:41.765 INFO 4149 [atcher-worker-4] Main : DefaultDispatcher-worker-4 null
2019-07-26 12:06:41.768 INFO 4149 [atcher-worker-8] Main : DefaultDispatcher-worker-8 null
2019-07-26 12:06:41.767 INFO 4149 [atcher-worker-6] Main : DefaultDispatcher-worker-6 null
2019-07-26 12:06:41.771 INFO 4149 [atcher-worker-7] Main : DefaultDispatcher-worker-7 null
2019-07-26 12:06:41.774 INFO 4149 [atcher-worker-5] Main : DefaultDispatcher-worker-5 null
2019-07-26 12:06:41.775 INFO 4149 [tcher-worker-10] Main : DefaultDispatcher-worker-10 null
2019-07-26 12:06:41.776 INFO 4149 [tcher-worker-12] Main : DefaultDispatcher-worker-12 null
[dd.tracing.agent - 2019-07-26 12:06:42:815 +0200] [Thread-1] INFO datadog.trace.agent.common.writer.DDAgentWriter - Flushing any remaining traces.
As you can see, parent span is lost in the coroutine. Same issue happens with automated instrumentario (which is discovered on our datadog APM dashboard). Right now only spans created within the same thread are being displaying.
Also if we create spans in the coroutine we suggest it to be a child of the parent span. But span is not set yet (null in the log). So I assume they are created as parent spans and either wrongly reported as parent ones or ignored as expired ones.
Do you have a solution for it?
Thanks.
If this is still an issue, would you be able to provide details as to what version of coroutines and dd you used to produce the sample above?
Hi @marcoferrer , thanks for your reply! I created a sample application to demonstrate the issue. Unfortunately it's still exists.
https://github.com/ikovalyov/dd-tracing-coroutines-issue
@ikovalyov Thanks for providing a sample application! Our team will take a look into this to investigate and reach out with our findings.
Looking at your example now I noticed that since you're manually constructing a span it would need to have setAsyncPropagation(true)
in order to be passed down during executor instrumentation.
I made the following tweak to your example and got the correct output.
private val logger: Logger = LoggerFactory.getLogger(Main.javaClass)
@JvmStatic
fun main(args: Array<String>) = runBlocking {
val tracer = GlobalTracer.get()
tracer.buildSpan("main").startActive(true).use { scope ->
(scope as TraceScope).setAsyncPropagation(true)
val span = tracer.activeSpan()
logger.info("${Thread.currentThread().name} $span")
coroutineScope {
repeat(10) {
launch(Dispatchers.IO) {
waitFor(1000)
}
}
}
}
}
Output:
INFO 69049 [main] Main$main$1 : main DDSpan [ t_id=220956131227001690, s_id=166114299606137339, ....
INFO 69049 [atcher-worker-2] Main : DefaultDispatcher-worker-2 DDSpan [ t_id=220956131227001690, s_id=166114299606137339, ....
INFO 69049 [atcher-worker-3] Main : DefaultDispatcher-worker-3 DDSpan [ t_id=220956131227001690, s_id=166114299606137339, ....
INFO 69049 [atcher-worker-1] Main : DefaultDispatcher-worker-1 DDSpan [ t_id=220956131227001690, s_id=166114299606137339, ....
INFO 69049 [atcher-worker-9] Main : DefaultDispatcher-worker-9 DDSpan [ t_id=220956131227001690, s_id=166114299606137339, ....
INFO 69049 [atcher-worker-8] Main : DefaultDispatcher-worker-8 DDSpan [ t_id=220956131227001690, s_id=166114299606137339, ....
INFO 69049 [atcher-worker-7] Main : DefaultDispatcher-worker-7 DDSpan [ t_id=220956131227001690, s_id=166114299606137339, ....
INFO 69049 [atcher-worker-6] Main : DefaultDispatcher-worker-6 DDSpan [ t_id=220956131227001690, s_id=166114299606137339, ....
INFO 69049 [atcher-worker-5] Main : DefaultDispatcher-worker-5 DDSpan [ t_id=220956131227001690, s_id=166114299606137339, ....
INFO 69049 [atcher-worker-4] Main : DefaultDispatcher-worker-4 DDSpan [ t_id=220956131227001690, s_id=166114299606137339, ....
INFO 69049 [tcher-worker-11] Main : DefaultDispatcher-worker-11 DDSpan [ t_id=220956131227001690, s_id=166114299606137339, ....
Im not entirely sure if there is a way for instrumentation to know when a scope should be made async automatically. You could potentially use a helper method to simplify this particular use case.
inline fun Scope.useAsync(block: () -> Unit){
(this as TraceScope).use { scope ->
scope.setAsyncPropagation(true)
block()
}
}
Thanks, I'll try this approach.
Good catch @marcoferrer. I'm pretty confident that will solve the problem. We want to evaluate better options in a future API refactor to allow us to remove the need to set that flag, but for now it is required.
I'll go ahead and close this. Please reopen if you still have issues with it.
I can confirm this approach worked. Perviously we had many spans missed in the DD APM. Now we have all the data listed. Unfortunately this method is quite verbose. It would be really nice if agent would be smart enough to automatically set asyncPropagation to true if it's related to the suspend function or coroutine builders are called.
As a more easy way to make it less verbose I can also suggest to add async
property to the @Trace
annotation which will notify agent to create async scope for the annotated function.
Thanks.
We realize this is not ideal. We have plans to improve it/remove the need for it, but not in the immediate term. Sorry for the trouble.
@tylerbenson startActive
is deprecated now. Can you suggest a solution which matches Tracer API?
Thanks.
Call start and activate separately.
Hi. Any progress on this issue? @Trace
with coroutines is still not working (version 0.70.0)
By the way this is the current way of doing this.
...
// @Trace -> not working
override suspend fun foo(request: BarRequest): BarResponse {
val tracer = GlobalTracer.get()
val span = tracer.buildSpan("fooOperation").start()
tracer.activateSpan(span).use { scope->
if (scope is TraceScope) { // has to be checked if trace is not enabled here comes a NoopScope
scope.setAsyncPropagation(true)
}
// do something
return ...
}
}
any updates on this?
We are using datadog APM(automatic instrumentation) + Kotlin coroutines. We noticed that alot of our spans are not being generated and/or sent to Datadog APM. The given solution with manual instrumentation alongwith scope.setAsyncPropagation(true) seems to work, however, it will be an overkill to do this for every single operation Is there a setting we can do at the global config level for the automatic instrumentation to use for Kotlin Coroutines?
Would love to hear back on @abhaver's point.
@abhaver @anthonyp-usafacts there is much worse: https://github.com/DataDog/dd-trace-java/issues/1123
+1, also interested in the same use case as @abhaver and @anthonyp-usafacts. This totally kills the instrumentation capability of any endpoint where we return 202 and do processing async via custom coroutine scopes.
At this point I'd consider this a serious limitation on Datadog to not support tracing with coroutines without additional surgical manipulation at every injection point given this bug has been open for 2+ years(how?).
Can we please get some eyes on this bug. This really stifles our ability to track async calls.
In case somebody else is struggling with that problem i want to show my solution for this to you.
It is using the continuation
in combination with the coroutine ThreadContextElement
.
import datadog.trace.api.GlobalTracer
import datadog.trace.bootstrap.instrumentation.api.AgentScope
import datadog.trace.bootstrap.instrumentation.api.AgentSpan
import datadog.trace.bootstrap.instrumentation.api.AgentTracer.TracerAPI
import datadog.trace.bootstrap.instrumentation.api.ScopeSource
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.ThreadContextElement
import kotlinx.coroutines.async
import org.slf4j.LoggerFactory
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
/**
* Runs async with a setup that is working with opentracing/datadog.
* It creates an own span for the async call.
*/
@OptIn(ExperimentalStdlibApi::class)
fun <T> CoroutineScope.openTracingAsync(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val dispatcher = context[CoroutineDispatcher.Key]
val tracer = GlobalTracer.get() as? TracerAPI
if (tracer != null) {
val spanBuilder = tracer.buildSpan("coroutine-async").withTag("dispatcher", dispatcher.toString())
val span: AgentSpan = spanBuilder.start()!!
val scope = tracer.activateSpan(span, ScopeSource.MANUAL, true)
scope.setAsyncPropagation(true)
val continuation = scope.captureConcurrent()
scope.close()
return async(context + ContinuationContextElement(continuation), start) {
try {
block()
} finally {
span.finish()
continuation.cancel()
}
}
} else {
return async(context, start, block)
}
}
/**
* This will activate the continuation on the new thread and close it after the work is done.
*/
class ContinuationContextElement(val continuation: AgentScope.Continuation) : ThreadContextElement<AgentScope?> {
override val key = Key
override fun updateThreadContext(context: CoroutineContext): AgentScope? {
val scope = continuation.activate()
if (scope == null) {
LOGGER.warn("Failed to activate continuation. APM data will be incomplete!")
}
return continuation.activate()
}
override fun restoreThreadContext(context: CoroutineContext, oldState: AgentScope?) {
oldState?.close()
}
companion object {
object Key : CoroutineContext.Key<ContinuationContextElement>
private val LOGGER = LoggerFactory.getLogger(ContinuationContextElement::class.java)
}
}
I was curious if there were any updates on this issue. It makes the usability of DD tracing much more difficult and awkward. The solution posted does work, but as people have said this should really just work with @Trace
.
Here's a solution that is VERY hacky, but at the same time dead simple: https://gist.github.com/monosoul/048e0456851fdc4bafe708d1807c6a87
It uses access to package private fields in DD tracing-related classes to put scope stack in coroutine context. Also, guarantees no memory leaks due to ScopeStack being left in a different thread.
@monosoul You have nice solution here, unfortunately it doesn't work when using the agent to auto-instrument the application. Because the agent changes the package of the CoreTracer from "datadog.trace.core" to "datadog.trace.agent.core", making the cast impossible.
@bruno-ortiz ah, yes, you're right. I also noticed that when I tried to use it with the agent and since then I've changed the solution, but forgot to update the gist. The new one is even hackier than the one before since it relies heavily on reflection, but with a proper test coverage you can at least notice if it stops working.
Here's the hackier solution: https://gist.github.com/monosoul/048e0456851fdc4bafe708d1807c6a87 (updated gist) :slightly_smiling_face:
Hello! I'm having some difficulties using this solution in my code. Can somebody provide me with a snippet on how to use this implementation on a "real" situation? Thanks!
@monosoul I tried integration your hackier solution in my kotlin application. I added dataDogTracingCoroutineContext
to coroutineContext at the entry point of request using interceptors.
However the execution failed with following exception
2022-12-03T19:52:24.525Z level=ERROR thread=grpc-default-executor-0 logger=io.grpc.internal.SerializingExecutor haystack_trace_id=- dd.service=com.expediagroup.activitiesshopspi.ApplicationKt grpc-client=evans user-agent=grpc-go/1.39.0 message=Exception while executing runnable io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable@16f5d606 exception=java.lang.RuntimeException: java.lang.NoSuchMethodException: datadog.trace.agent.core.scopemanager.ContinuableScopeManager$ScopeStack.<init>()
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:342) ~[grpc-core-1.49.1.jar:1.49.1]
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:315) ~[grpc-core-1.49.1.jar:1.49.1]
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834) ~[grpc-core-1.49.1.jar:1.49.1]
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[grpc-core-1.49.1.jar:1.49.1]
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) ~[grpc-core-1.49.1.jar:1.49.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: java.lang.NoSuchMethodException: datadog.trace.agent.core.scopemanager.ContinuableScopeManager$ScopeStack.<init>()
at java.lang.Class.getConstructor0(Class.java:3349) ~[?:?]
at java.lang.Class.getDeclaredConstructor(Class.java:2553) ~[?:?]
at datadog.trace.core.scopemanager.DDTracerExtensions.newScopeStack(DatadogTracingConfig.kt:72) ~[classes/:?]
at datadog.trace.core.scopemanager.DatadogTracingConfigKt.dataDogTracingCoroutineContext(DatadogTracingConfig.kt:129) ~[classes/:?]
I think I need to include latest dd-trace-core dep in my app for classpath to find that file. However I see dd-trace-core is not being published as a jar anymore.
Can you provide a real app example on how to integrate your solution to any kotlin app?
P.S. Mine is a spring boot based kotlin app
@vsingal-p The new version of Datadog agent got some package refactoring. Some classes got moved, and some were changed. At this point I think maintaining that snippet doesn't make much sense - it's an ugly workaround and might break with the next update. If you want to keep using it - you can stick to agent version 0.114.0
, I think that was the last one that worked fine with this workaround. (that's what we did for now at work)
In the meantime I'll try to contribute a better solution to datadog agent itself if I'll have time for that.
open-telemetry
has a nice and simple instrumentation implementation for coroutines, I think it can be combined with the initial workaround snippet I provided here to have a working coroutines instrumentation in DD agent.
Hey everyone, coroutines instrumentation will be available with release 1.7.0
. To enable it you will need to either provide a system property dd.integration.kotlin_coroutine.experimental.enabled
or an env var DD_INTEGRATION_KOTLIN_COROUTINE_EXPERIMENTAL_ENABLED
set to true
.
Thanks a lot @monosoul and @bantonsson 🙇
A small update: I just tested the instrumentation, unfortunately atm it only works with coroutines versions < 1.5.0 . I'll try to implement instrumentation for >= 1.5.0 in the coming days, shouldn't take much time, but there are a few changes necessary to the buildscript of the agent.
:robot: This issue has been addressed in the latest release. See full details in the Release Notes.