dd-trace-java icon indicating copy to clipboard operation
dd-trace-java copied to clipboard

Distributed tracing is not working on batch consumption of kafka messages using Spring cloud stream

Open iam-nagarajan opened this issue 5 years ago • 3 comments

Datadog distributed tracing was working fine when we were consuming events one by one from Kafka using spring cloud stream. It was able to use the dd.trace_id available in the kafka message header and use it as trace_id in the current service. When we enable batch mode in Spring cloud stream(https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RC1/reference/html/spring-cloud-stream-binder-kafka.html#_consuming_batches), datadog agent is no more considering the trace_id available in kafka message headers.

iam-nagarajan avatar Feb 25 '20 11:02 iam-nagarajan

We're having this same issue as well. Using batch-mode in spring-cloud-stream.

We do have kafka.consume traces appearing for the services, but none of the trace is there beyond the fact that a message was read from the topic. In other words, the trace begins and stops with the message consumption and is usually single-digit microseconds long.

josephtaylor avatar Jun 22 '20 19:06 josephtaylor

We're facing the exact same issue when using batch listener mode (spring.kafka.listener.type=batch) with spring-kafka. As @josephtaylor mentioned the traces are incomplete and doesn't trace beyond the fact that a consumption was done from a topic.

akhilbojedla avatar Mar 10 '22 15:03 akhilbojedla

I also encountered the issue described by @josephtaylor and @akhilbojedla (using spring-kafka batch listener). After some investigation, it looks like this only happens with specific listener method declarations, e.g. when using argument type other than ConsumerRecords.

dd-trace-java defines advice to initiate a new tracing span on each ConsumerRecords item when it is iterated over (KafkaConsumerInstrumentation.java). After checking how spring-kafka accesses this object, I ran into this method of KafkaMessageListenerContainer that might cause the spans to be created and closed without capturing any of the actual processing - the instrumented iterator is exhausted before any record processing takes place. It is conditionally called if the listener method signature contains argument type other than ConsumerRecords, e.g. List<ConsumerRecord> or ConsumerRecord (when using a batch-to-record adapter).

Rewriting the listener from

@KafkaListener(topics = {"a-topic"}, groupId = "a-group")
public void onMessage(List<ConsumerRecord<K,V>> records) {
  // processing
}

to

@KafkaListener(topics = {"a-topic"}, groupId = "a-group")
public void onMessage(ConsumerRecords<K,V> records) {
  for (var record : records) {
    // processing
  }
}

seems to fix the problem - span IDs are propagated correctly.

matej-staron avatar Jun 01 '23 11:06 matej-staron