dd-trace-java icon indicating copy to clipboard operation
dd-trace-java copied to clipboard

Kafka consumer doesn't use context from ConsumerRecord Headers

Open gbiv opened this issue 3 years ago • 4 comments

I'm using Alpakka in a scala service to subscribe to topics. I'm seeing that the ConsumerRecords that are being processed in my Flow have the correct header values for x-datadog-trace-id and x-datadog-parent-id however values for mdc.dd.span_id and mdc.dd.trace_id are completely different than those in the headers for the same call. Screen Shot 2022-02-16 at 1 09 15 PM

This causes any calls to other services, via grpc for example, to appear as new traces rather than continuations of the originating trace. I've tried extracting the header values and then setting appropriate values as Metadata on my grpc client calls but I haven't found sufficient information/API to tie things together properly.

I've just updated to 0.95.0 and am seeing the same behavior. It does sound similar to the behavior described in this reported issue but I'm using scala and Alpakka so I'm reporting it separately. Is this the expected behavior, ie that a new trace is created for a kafka.conusme operation?

gbiv avatar Feb 16 '22 19:02 gbiv

Hi Gerrick,

The trace and span ids recorded in the ConsumerRecord are only extracted and used as the context at the point where we create the kafka.consume span - before that point the thread's MDC and global tracer will not reflect the record's context because it has yet to be extracted.

For the client API we try to create kafka.consume spans at the point where applications iterate over records rather than during internal processing because we want to capture the consume time from the application's perspective. For the streaming API we try to create kafka.consume spans just before the stream's processor node calls back into the application, for the same reason.

Depending where you've added your custom instrumentation it's possible that you'll see trace and span ids in the record that aren't yet reflected in the MDC or global tracer, for example before the kafka.consume span is created - but also after that span has finished, because it will reset the context back to the original value. If that is the case then you'd need to extract the trace and span id yourself from the record.

If your custom instrumentation is expected to be inside the kafka.consume span then we'd need more details about where you've added that to investigate further.

mcculls avatar Feb 16 '22 23:02 mcculls

Hi Stuart, Thanks for the info here. It makes sense as to what I'm seeing now. I thought it would be worth getting an example going that would help illustrate what I'm trying to do and have been unsuccessful at.

I've forked the Alpakka kafka to websocket sample and made changes to integrate the dd-java-agent. This app essentially demonstrates the setup I have in my services, ie, a Kafka Consumer that then sends a message over a websocket. In my service, the consumer sends a message over grpc but the result seems to be the same.

If I run that app and then send a message to kafka via the /push endpoint, I get three distinct traces in datadog, a single trace that corresponds to the kafka.produce and kakfa.consume spans and then a separate trace for the websocket call (/events) and another one for the http router GET (/push).

What I expected is that each of these spans would all be under the same trace. Is that something I can achieve with manual instrumentation in the services?

gbiv avatar Feb 17 '22 19:02 gbiv

thanks for the example - I see that Alpakka-kafka builds on akka-streams which we don't yet support (we also don't have support for web-socket propagation)

you should be able to use manual instrumentation to propagate the trace: https://docs.datadoghq.com/tracing/setup_overview/custom_instrumentation/java/#adding-spans - you can use that to get the current trace details from inside the consumer, you'd then need to decide how to propagate it over the websocket (whether using headers or part of the payload) and then build and activate a span on the other side using the trace details you just sent

mcculls avatar Feb 18 '22 12:02 mcculls

Got it! TL;DR Get the tracer and use an implementation of TextMap to extract the Context to use as the parent span.


What I didn't understand was how to create a Context from the existing information that was available in the ConsumerRecord Headers. I eventually found this issue which lead me to the kinesis example which gave me the answer I was looking for.

I think it would be useful to mention that example( or something similar) in the documentation around manually creating spans. Once I found that example, it was pretty straightforward.

gbiv avatar Feb 23 '22 14:02 gbiv