tracing icon indicating copy to clipboard operation
tracing copied to clipboard

Traceid and spanid wrong when using Kotlin Reactor Coroutines

Open akoufa opened this issue 2 years ago • 67 comments

Hello. When using Kotlin Reactor Coroutines like the following:

  return mono {
            val text = "Greetings from Spring Boot!"
            logger.error(text)
            text
        }

then the traceId and spanId that are being included in the log statement above look like the following:

ERROR [00000000000000000000000000000000,0000000000000000] 80834 --- [atcher-worker-1] com.example.demo.HelloController         : Greetings from Spring Boot!

The traceId and spanId in the logs are always 0 (zeros) .

This happens only for log statements inside such mono {} blocks. Other log statements outside of these blocks work as expected.

I have created a sample reproducer project here:

https://github.com/akoufa/springbootissues

Just start the Spring Boot app and use the following curl request:

curl --location 'localhost:8080/'

akoufa avatar Feb 21 '23 08:02 akoufa

Since I'm not an expert in Kotlin - we have a test here that proves that this is working (https://github.com/micrometer-metrics/micrometer/blob/main/micrometer-core/src/test/kotlin/io/micrometer/core/instrument/kotlin/AsContextElementKtTests.kt). How does what we do differ from what you do?

marcingrzejszczak avatar Mar 01 '23 10:03 marcingrzejszczak

@marcingrzejszczak To be honest I don't know exactly. I don't use Observation directly but I am using the Reactor Coroutine library like the following:

return mono {
            val text = "Greetings from Spring Boot!"
            logger.error(text)
            text
        }

This mono {...} builder allows using coroutines and suspending functions inside the { } and evaluates to a Mono<T> . But the log statements have the issue I mentioned above. You can checkout the reproducer code I have shared it's pretty plain and simple.

akoufa avatar Mar 01 '23 16:03 akoufa

@marcingrzejszczak @shakuzen The behaviour is very strange. I did some tests and I found the following:

 @GetMapping("/")
    suspend fun index(): String {
        logger.error("Greetings")
        mono {
            "test"
        }.awaitSingle()
        logger.error("Spring Boot!")
        return "Spring Boot"
    }

The first statement logs the trace id and span id as expected. The second statement mostly has zeroed values.

image

Of course the difference is that we have thread switch after the mono { ... } statement from ctor-http-nio-4 to atcher-worker-1

akoufa avatar Mar 06 '23 08:03 akoufa

The problem is related to setting the span to null on the scope reset when there is no other span observation to fall back to, and how OtelTracer handles this.

Basically, when coroutines being finished, it does the following:

  1. Internally Kotlin invokes io.micrometer.core.instrument.kotlin.KotlinObservationContextElement#restoreThreadContext
  2. If there was no prior Observation set, it will invoke io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor#reset.
  3. This eventually gets into io.micrometer.tracing.handler.TracingObservationHandler#onScopeReset, where it ignores any passed context
  4. It calls the current tracer's method withSpan and passes null
  5. If the current tracer is OtelTracer and the span passed into io.micrometer.tracing.otel.bridge.OtelTracer#withSpan is null, then it initializes the current Otel context with the invalid span, which has traceId and spanId set to zeroes (as a side-effect in the OtelSpanInScope constructor)

In the end we get the current Otel context being set to the invalid span, even if before it was set to null, which results to a polluted ThreadLocal variable.

Somewhere in io.micrometer.tracing.otel.bridge.OtelSpanInScope#storedContext, perhaps, some special handling should be added for an invalid span, so it gets reset to null, instead of the invalid value.

I'm not entirely sure what is the right approach here, as I don't fully understand the difference between reset and close.

vooft avatar Mar 20 '23 13:03 vooft

AFAIR the problem in OTel is that you can't remove a span from thread local - you can only set it to invalid. OpenTelemetry will never return a null span when there's no current span. Look at the Span interface from opentelemetry

public interface Span extends ImplicitContextKeyed {

  /**
   * Returns the {@link Span} from the current {@link Context}, falling back to a default, no-op
   * {@link Span} if there is no span in the current context.
   */
  static Span current() {
    Span span = Context.current().get(SpanContextKey.KEY);
    return span == null ? getInvalid() : span;
  }

  /**
   * Returns an invalid {@link Span}. An invalid {@link Span} is used when tracing is disabled,
   * usually because there is no OpenTelemetry SDK installed.
   */
  static Span getInvalid() {
    return PropagatedSpan.INVALID;
  }

So if we null the current span by calling Tracing's tracer.withSpan(null) the best we can do for OTel is to reset this to the default invalid span.

marcingrzejszczak avatar Mar 20 '23 14:03 marcingrzejszczak

@marcingrzejszczak @vooft What does this mean that the library cannot be used in conjunction with Kotlin Coroutines + Reactor?

akoufa avatar Mar 21 '23 07:03 akoufa

I didn't say that. I'm just replying to @vooft comment

marcingrzejszczak avatar Mar 21 '23 11:03 marcingrzejszczak

@marcingrzejszczak I am trying to use observationRegistry.asContextElement() but it seems to be losing my logging.. a small snippet of how I am using it

logger.info("has context")
CoroutineScope(Dispatchers.Default).launch(errorHandler) {
	withContext(observationRegistry.asContextElement()) {
		logger.info("has no context")
	}
}
logger.info("also has no context")

I am using spring boot and injecting observationRegistry, not sure if that is proper usage.

Also I can get the "has no context" logging properly if I change withContext(observationRegistry.asContextElement()) { with withContext(MDCContext(context)) { where val context = MDC.getCopyOfContextMap() is set before CoroutineScope. However that last logger.info always loses context

msosa avatar Apr 20 '23 01:04 msosa

Hi @marcingrzejszczak. We have the same problem as @akoufa but with coroutines router (we are using Brave insead of OTel). If we use standard router and provide coroutine context as suggested, both logs (before&after delay) contains traceId&spanId.

@Configuration(proxyBeanMethods = false)
class RoutesConfiguration(@Autowired private val observationRegistry: ObservationRegistry) {
	private val logger = KotlinLogging.logger {}

	// traceId&spanId are logged only before delay
	@Bean
	fun notWorkingCoRouter() = coRouter {
		GET("/log1") {
			logger.info("before")
			delay(500)
			logger.info { "after" }
			ServerResponse.ok().buildAndAwait()
		}
	}

	// traceId&spanId are logged before and after delay
	@Bean
	fun workingRouter() = router {
		GET("/log2") {
			mono(observationRegistry.asContextElement()) {
				logger.info("before")
				delay(500)
				logger.info { "after" }
				ServerResponse.ok().buildAndAwait()
			}
		}
	}
}

Do you think this issue can be fixed or we should use some kind of workaround?

qavid avatar Apr 20 '23 09:04 qavid

Is there any workaround that does not include injecting the observationRegistry to all controllers?

And is there any plan to fix this issue?

tomfi avatar May 16 '23 14:05 tomfi

I don't have enough Kotlin knowledge. If there's anyone that can help us out with this then please be my guest and comment

marcingrzejszczak avatar May 16 '23 15:05 marcingrzejszczak

@marcingrzejszczak in general the problem is that when a coroutine opens, the context might be lost in the child coroutines.

for example:

@RestController
class FooRestController {

  @GetMapping
  suspend fun foo(): Foo {
    logger.info { "before" }
    delay(500) // kotlin decide to change to coroutine#2
    logger.info { "after" } // we lose the observation context
  }

  @GetMapping
  fun foo(): Mono<Foo> {
    return mono(observationRegistry.asContextElement()) {
        logger.info { "before" }
        delay(500) // kotlin decide to change to coroutine#2
        logger.info { "after" } // we do not loss the context, because the parent coroutine was started with the observation context and its being propogated
    }
  }
}

If it was possible to somehow add the observationRegistry.asContextElement() into the place that initialize the coroutine it should be propagated downstream.

Another option is somehow to hook between the MDC context, Reactor context, and Coroutine context. An example of how it has been done for MDC can be found with this official library: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-slf4j/

with this library, another working example of propagating properly would be:

@RestController
class FooRestController {

  @GetMapping
  suspend fun foo(): Foo {
    logger.info { "before" }
    delay(500) // kotlin decide to change to coroutine#2
    logger.info { "after" } // we lose the observation context
  }

  @GetMapping
  suspend fun foo(): Foo {
    return withContext(MDCContext()) {
        logger.info { "before" }
        delay(500) // kotlin decide to change to coroutine#2
        logger.info { "after" } // we do not loss the context, because kotlinx-coroutines-slf4j handles the binding between MDC and the coroutine context
    }
  }
}

i am not sure where exactly it should be fixed, but in theory if we had an entrypoint to add the observationContext as a coroutine context into the place that spring initialize the coroutine scope for the controller's suspend function.. it should work as expected.

tomfi avatar May 16 '23 16:05 tomfi

FYI, on the Spring side, for Controllers, the coroutine entry point is provided by InvocableHandlerMethod class.

For Web MVC:

https://github.com/spring-projects/spring-framework/blob/e5ee369e70e0a8ca04437f502649b11171a1c741/spring-web/src/main/java/org/springframework/web/method/support/InvocableHandlerMethod.java#L201

Its doInvoke(...) checks if the function is a Kotlin suspend function and, if so, ultimately calls CoroutinesUtils.invokeSuspendingFunction(...)

Notably, it does not give a CoroutineContext object, so the coroutine is started with no context.

For WebFlux:

https://github.com/spring-projects/spring-framework/blob/c3e28728ce5c11655831cbc20f828e956e1c42a1/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/InvocableHandlerMethod.java#L133

Similar to Web MVC, it calls CoroutinesUtils.invokeSuspendingFunction(...) without giving a CoroutineContext.

Maybe Spring should take whatever is in Reactor Context at this point and copy it into Coroutine Context?

EDIT: I don't have a suggested workaround, a fix probably needs to happen in Spring itself.

EDIT 2: There are two InvocableHandlerMethod implementations, one for Web MVC and one of WebFlux.

calebdelnay avatar May 16 '23 17:05 calebdelnay

I think we need help from Sebastien. Let us summon our Kotlin Guru @sdeleuze!

marcingrzejszczak avatar May 17 '23 10:05 marcingrzejszczak

@marcingrzejszczak @sdeleuze

If it helps, i played with spring's code and the only thing that worked for me is getting the observationContext and using it in org.springframework.core.CoroutinesUtils#invokeSuspendingFunction

CoroutineContext combinedContext;
if (observationRegistry != null) {
    combinedContext = KCoroutineUtils.asCoroutineContext(observationRegistry)
        .plus(context);
} else {
    combinedContext = context;
}

Mono<Object> mono = MonoKt.mono(combinedContext, (scope, continuation) ->
                KCallables.callSuspend(function, getSuspendedFunctionArgs(method, target, args), continuation))
        .filter(result -> !Objects.equals(result, Unit.INSTANCE))
        .onErrorMap(InvocationTargetException.class, InvocationTargetException::getTargetException);

KCoroutineUtils

object KCoroutineUtils {
    @JvmStatic
    fun asCoroutineContext(observationRegistry: ObservationRegistry): CoroutineContext {
        return observationRegistry.asContextElement()
    }
}

For the POC i just took the observationContext from the applicationContext in org.springframework.web.reactive.result.method.InvocableHandlerMethod#invoke

if (isSuspendingFunction) {
    ObservationRegistry observationRegistry = exchange.getApplicationContext().getBean(ObservationRegistry.class);
    value = CoroutinesUtils.invokeSuspendingFunction(method, getBean(), observationRegistry, args);
}

If we find a way to inject the observationRegistry at this point, everything will work as intended.

I also tried to use captureContext and i saw there is an observation inside the reactor context but i could not make it propagate properly to the coroutines

tomfi avatar May 22 '23 14:05 tomfi

@marcingrzejszczak Do we have any update on this or a suggestion for a temporary workaround on our side? Thank you

akoufa avatar Jun 06 '23 09:06 akoufa

Be aware there are a bunch of Coroutines context issues still open on Spring Framework, and this area needs refinements to support all the use cases. See for example https://github.com/spring-projects/spring-framework/issues/26977, https://github.com/spring-projects/spring-framework/issues/27522 and https://github.com/spring-projects/spring-framework/issues/27010.

It is expected to have seemless context propagation when doing Reactor to Coroutines conversion (this is handled on Coroutines Reactor integration side) but when creating a new Coroutines context and when an existing should be reused likely requires some refinements.

We probably need to plan some related discussion with me and @marcingrzejszczak and/or @bclozel to discuss the design of such refined integration.

sdeleuze avatar Jun 21 '23 09:06 sdeleuze

Any updates regarding this issue?

tomfi avatar Jul 19 '23 13:07 tomfi

We're updating the issue when there are updates. We need to discuss things with @sdeleuze but there were other priorities so far

marcingrzejszczak avatar Jul 19 '23 14:07 marcingrzejszczak

Hey, I am starting working on that and may ask guidance to Kotin team on the best way to fix that. This kind of issue seems to impact both Micrometer and Spring as soon as kotlinx.coroutines.reactor.mono is involved.

sdeleuze avatar Aug 22 '23 10:08 sdeleuze

So the challenge was to pass the CoroutineContext despite the fact that kotlinx.coroutines.reactor.mono is not a suspending function (it cannot capture the context and requires to pass the context explicitly, as documented).

I made progresses on Spring Framework side, for now focusing on the functional router DSL:

  • I think I have found a way to pass the CoroutineContext defined in a filter to the suspending request handler, see this related comment with a draft commit.
  • I am also considering adding a fun context(provider: suspend (ServerRequest) -> CoroutineContext) method to the router DSL in Spring Framwork 6.1 to provide a custom CoroutineContext for various use cases, see https://github.com/spring-projects/spring-framework/issues/27010#issuecomment-1695796770.
  • The Reactive variant of awaitSingle() will be replaced by the Mono variant, see https://github.com/spring-projects/spring-framework/issues/31127

I will try to refine the annotation-based WebFlux support as well.

I am not sure what need to be done or not on Micrometer side, but hopefully those progresses on Spring Framework side will help. Feedback on those draft commits welcomed if any, otherwise I will move forward on Spring Framework side and drop a comment here when the various improvement will be available in Spring Framework 6.1.0-SNAPSHOT.

sdeleuze avatar Aug 28 '23 21:08 sdeleuze

With the 3 issues above fixed, 6.1.0-SNAPSHOT Spring Framework builds should now contain everything needed from a Spring Framework POV for CoroutineContext propagation in WebFlux functional DSL.

If somebody could build and share a reproducer with Spring coRouter and Micrometer, that would allow me and Micrometer team to provide guidelines, identify potential refinements and validate that those recent Spring changes are useful and works as expected.

sdeleuze avatar Aug 29 '23 16:08 sdeleuze

I have just added the last part of Spring Framework 6.1 CoroutineContext refinement: it is now possible to propagate it via CoWebFilter. https://github.com/spring-projects/spring-framework/issues/27522

sdeleuze avatar Sep 07 '23 10:09 sdeleuze

I have just added the last part of Spring Framework 6.1 CoroutineContext refinement: it is now possible to propagate it via CoWebFilter. spring-projects/spring-framework#27522

Thanks for these new features. However, I'm not sure how this will fix the fact we have to call withContext manually into each controller method like in @tomfi example:

@RestController
class FooRestController {

  @GetMapping
  suspend fun foo(): Foo {
    return withContext(MDCContext()) {
        logger.info { "before" }
        delay(500)
        logger.info { "after" }
    }
  }
}

This is not very intuitive and quite invasive. Also, this used to work in Spring Boot 2 by the way, I don't know if this might help or not.

grassehh avatar Sep 20 '23 10:09 grassehh

@grassehh

However, I'm not sure how this will fix the fact we have to call withContext manually into each controller method

I updated the dependency to Spring Framework 6.1 and registered the CoWebFilter with the following code. Please note that this is not the exact code used in my project, but a simplified version. The key point is that I'm using withContext in CoWebFilter to add MDCContext(). It seems to be working as intended for now, but I'll need to take a closer look. I hope this helps.

@Bean
fun coroutineWebFilter(): WebFilter {
    return object : CoWebFilter() {
        override suspend fun filter(exchange: ServerWebExchange, chain: CoWebFilterChain) 
        = withContext(MDCContext()) {
            chain.filter(exchange)
        }
    }
}

styx7659 avatar Sep 20 '23 13:09 styx7659

Hello, I just wanted to share the sample I created (which I shared already in other issues). I thought it could be useful for you to try : https://github.com/grassehh/spring-boot-3-tracing-coroutine In this sample the point is to ensure context propagation works in most scenarios in a Spring Webflux + Kotlin Coroutines environment. Basically at the moment I test 3 routes to test 3 scenarios:

  • Simple behavior where all the code is inside the Controller's coroutine scope
  • Adding some delay to simulate coroutine function suspension
  • Launching a new coroutine inside the Controller code

I tried to add most basic Spring features like Security filter, webClient external API calls.. and I will probably add some controller advice to test context propagation in exception handling.

grassehh avatar Sep 22 '23 15:09 grassehh

Fails also when calling a coroutine from Java, as vanilla as possible. Servlet environment + webclient / http client called from inside a kotlin suspend function.

Hooks.enableAutomaticContextPropagation() does not seem to do anything. WebClient/resttemplate with and without mono works without the hook or the capture.

@GetMapping("/")
public String index() {
  logger.info("==index=="); // traced
  MonoKt
    .mono((CoroutineContext) Dispatchers.getIO(), (scope, continuation) -> someSuspendFunction(continuation))  //suspend content not traced
    .doOnEach(it -> logger.info("test")) // not traced
    .contextCapture() // does not help
    .block();

  return "ok";
}
//id("org.springframework.boot") version "3.1.4"
//id("io.spring.dependency-management") version "1.1.3"

dependencies {
  implementation("org.springframework.boot:spring-boot-starter-actuator")
  implementation("org.springframework.boot:spring-boot-starter-web")
  implementation("org.springframework.boot:spring-boot-starter-webflux")
  implementation("io.micrometer:micrometer-tracing-bridge-brave")
  implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.7.3")
}

Not sure if it's the same problem. Read around and couldn't figure out how to pass in MDCContext() or ReactorContext. The function observationRegistry.asContextElement() described above is not available for me, so not sure what to think :)

krodyrobi avatar Oct 03 '23 17:10 krodyrobi

@grassehh here are the changes I made to your repo - basically just upgrading versions to get to Spring Framework 6.1 snapshots and the code in https://github.com/micrometer-metrics/tracing/issues/174#issuecomment-1727691192. With that I was able to get tracing context in MDC in controllers using coroutines: https://github.com/grassehh/spring-boot-3-tracing-coroutine/compare/main...shakuzen:CoWebFilter?expand=1

I did not look more into the logbook stuff.

shakuzen avatar Oct 12 '23 09:10 shakuzen

Can you guys have a look over micrometer-metrics/tracing#403 as it seems to still happen during calls originating from java, not sure if it's the exact issue but seems like it.

Using the latest snapshot and passing in mdccontext but still not getting the http client to forward baggage headers, logs do work though.

krodyrobi avatar Oct 13 '23 19:10 krodyrobi

@grassehh here are the changes I made to your repo - basically just upgrading versions to get to Spring Framework 6.1 snapshots and the code in #174 (comment). With that I was able to get tracing context in MDC in controllers using coroutines: https://github.com/grassehh/spring-boot-3-tracing-coroutine/compare/main...shakuzen:CoWebFilter?expand=1

I did not look more into the logbook stuff.

I confirm this works fine, but only if I use withContext(observationRegistry.asContextElement()) inside the CoWebFilter :) If I use withContext(MDCContext()) the myBaggageController is not logged when calling the /suspend route.

grassehh avatar Nov 16 '23 17:11 grassehh