dd-trace-java
dd-trace-java copied to clipboard
Kafka consumer doesn't use context from ConsumerRecord Headers
I'm using Alpakka in a scala service to subscribe to topics.
I'm seeing that the ConsumerRecord
s that are being processed in my Flow
have the correct header values for x-datadog-trace-id
and x-datadog-parent-id
however values for mdc.dd.span_id
and mdc.dd.trace_id
are completely different than those in the headers for the same call.
This causes any calls to other services, via grpc for example, to appear as new traces rather than continuations of the originating trace. I've tried extracting the header values and then setting appropriate values as Metadata on my grpc client calls but I haven't found sufficient information/API to tie things together properly.
I've just updated to 0.95.0
and am seeing the same behavior. It does sound similar to the behavior described in this reported issue but I'm using scala and Alpakka so I'm reporting it separately.
Is this the expected behavior, ie that a new trace is created for a kafka.conusme
operation?
Hi Gerrick,
The trace and span ids recorded in the ConsumerRecord
are only extracted and used as the context at the point where we create the kafka.consume
span - before that point the thread's MDC and global tracer will not reflect the record's context because it has yet to be extracted.
For the client API we try to create kafka.consume
spans at the point where applications iterate over records rather than during internal processing because we want to capture the consume time from the application's perspective. For the streaming API we try to create kafka.consume
spans just before the stream's processor node calls back into the application, for the same reason.
Depending where you've added your custom instrumentation it's possible that you'll see trace and span ids in the record that aren't yet reflected in the MDC or global tracer, for example before the kafka.consume
span is created - but also after that span has finished, because it will reset the context back to the original value. If that is the case then you'd need to extract the trace and span id yourself from the record.
If your custom instrumentation is expected to be inside the kafka.consume
span then we'd need more details about where you've added that to investigate further.
Hi Stuart, Thanks for the info here. It makes sense as to what I'm seeing now. I thought it would be worth getting an example going that would help illustrate what I'm trying to do and have been unsuccessful at.
I've forked the Alpakka kafka to websocket sample and made changes to integrate the dd-java-agent
. This app essentially demonstrates the setup I have in my services, ie, a Kafka Consumer that then sends a message over a websocket. In my service, the consumer sends a message over grpc but the result seems to be the same.
If I run that app and then send a message to kafka via the /push
endpoint, I get three distinct traces in datadog, a single trace that corresponds to the kafka.produce
and kakfa.consume
spans and then a separate trace for the websocket call (/events
) and another one for the http router GET (/push
).
What I expected is that each of these spans would all be under the same trace. Is that something I can achieve with manual instrumentation in the services?
thanks for the example - I see that Alpakka-kafka builds on akka-streams which we don't yet support (we also don't have support for web-socket propagation)
you should be able to use manual instrumentation to propagate the trace: https://docs.datadoghq.com/tracing/setup_overview/custom_instrumentation/java/#adding-spans - you can use that to get the current trace details from inside the consumer, you'd then need to decide how to propagate it over the websocket (whether using headers or part of the payload) and then build and activate a span on the other side using the trace details you just sent
Got it!
TL;DR Get the tracer and use an implementation of TextMap
to extract the Context
to use as the parent span.
What I didn't understand was how to create a Context
from the existing information that was available in the ConsumerRecord
Headers
. I eventually found this issue which lead me to the kinesis example which gave me the answer I was looking for.
I think it would be useful to mention that example( or something similar) in the documentation around manually creating spans. Once I found that example, it was pretty straightforward.