spring-cloud-sleuth
spring-cloud-sleuth copied to clipboard
Project Reactor tracing when using aggregation methods
I'm using spring-cloud-sleuth to perform tracing for my data pipeline application with project reactor.
Traces and span works fine when my pipeline works per message, e.g. .map(message -> myFunc(message))
I manage to succeed preforming traces for my data pipeline when it consists of:
- consume from kafka -> 2. transform and enrich message -> 3. produce the enriched message to another kafka topic.
Things get complicated when in my enrichment section (e.g. 2) I need to buffer messages and afterwards use
.flatmap()on them. This is required for business and functional logic such as bulk operations. The buffering makes all the non-first-of-group messages context (trace, span, baggage) to disappear . so the whole buffer receives the first message's context.
consumeFromKafka()
.map(..) // transform kafkaRecore to msg pojo
.bufferTimeout(50, 10secondes) // to List<msg>
- What the correct way to trace this kind of flows? e.g. in general how shall one address this kind of issues when using aggregation methods such as buffer, window etc.
- Is there any way to save the message context before the buffering, and return the context after the buffering?
I'm not sure if this is considered to be a bug / an intentional design / a lack of knowledge. I'll appreciate your help.
With buffering of messages you will have a problem... Imagine that you buffer 10 messages each of them having a different trace id, which one will you pick? What you could do is create yet another trace id and maybe tag those messages with all the existing traces ids? But that's just an idea, there's no easy, existing solution for this. I'm actually leaning towards the option that it's not really possible
@marcingrzejszczak First of all thank you for the quick response!!
I try to understand if that's possible to follow a message id before & after the buffering. After all in my use case I'm buffering just for performing logic such as picking specific messages out of a batch.
I wonder if that's possible to keep track on the MDC.getCopyOfContextMap of a message before & after the buffering (e.g. I don't need to log the buffer. I just want to log the message before the buffer & after it).
Do you think it's possible to prevent Sleuth from propagate MDC content to Reactor aggregation methods, and afterwards return to the context of each message?
Do you think it's possible to prevent Sleuth from propagate MDC content to Reactor aggregation methods, and afterwards return to the context of each message?
That should work already :tm: If it doesn't then it's just a bug in Sleuth & Reactor instrumentation (maybe an unsolvable one). Have you tried going with the manual Reactor instrumentation mode?
@marcingrzejszczak I have not tried to use manual instrumentation because I want seluth to auto instrument all operator's except aggregation operator's.
Do you think it's possible to prevent Sleuth from propagate MDC content to Reactor aggregation methods, and afterwards return to the context of each message?
I will clearly my question in the previous comment, is this kind of behavior should be implemented in reactor sleuth instrumentation as a new instrumentation-type?
I don't know really cause I don't know if that's a common usecase. IMO you should configure it yourself first and if this issue gets more traction we can think of what to do next. Sleuth is feature complete at this point (we're moving towards Spring Observability initiative in Spring Framework 6 and Spring Boot 3).
I am trying to implement this behavior with the reactor instrumentation-type manual and I could find any examples or detailed docs on this kind of instrumentation-type.
MANUAL - wraps every Reactor in the least invasive way without passing of tracing context. It’s up to the user to do it.
This is the only reference in the sleuth docs to reactor manual instrumentation. How can I achieve passing tracing context? I'd be really glad to see some practical examples
Check this section https://docs.spring.io/spring-cloud-sleuth/docs/current/reference/html/integrations.html#sleuth-reactor-integration and the WebFluxSleuthOperators class
I'm using pure Project Reactor (without Spring Integration nor Webflux). As mentioned, I'm doing that for data pipelines use case.
That doesn't matter - most of these methods are working fine without WebFlux
Closing due to age of the question. If you would like us to look at this issue, please comment and we will look at re-opening the issue.