dd-trace-java
dd-trace-java copied to clipboard
Distributed tracing is not working on batch consumption of kafka messages using Spring cloud stream
Datadog distributed tracing was working fine when we were consuming events one by one from Kafka using spring cloud stream. It was able to use the dd.trace_id available in the kafka message header and use it as trace_id in the current service. When we enable batch mode in Spring cloud stream(https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RC1/reference/html/spring-cloud-stream-binder-kafka.html#_consuming_batches), datadog agent is no more considering the trace_id available in kafka message headers.
We're having this same issue as well. Using batch-mode in spring-cloud-stream.
We do have kafka.consume
traces appearing for the services, but none of the trace is there beyond the fact that a message was read from the topic. In other words, the trace begins and stops with the message consumption and is usually single-digit microseconds long.
We're facing the exact same issue when using batch listener mode (spring.kafka.listener.type=batch
) with spring-kafka
. As @josephtaylor mentioned the traces are incomplete and doesn't trace beyond the fact that a consumption was done from a topic.
I also encountered the issue described by @josephtaylor and @akhilbojedla (using spring-kafka
batch listener). After some investigation, it looks like this only happens with specific listener method declarations, e.g. when using argument type other than ConsumerRecords
.
dd-trace-java
defines advice to initiate a new tracing span on each ConsumerRecords
item when it is iterated over (KafkaConsumerInstrumentation.java
). After checking how spring-kafka
accesses this object, I ran into this method of KafkaMessageListenerContainer
that might cause the spans to be created and closed without capturing any of the actual processing - the instrumented iterator is exhausted before any record processing takes place. It is conditionally called if the listener method signature contains argument type other than ConsumerRecords
, e.g. List<ConsumerRecord>
or ConsumerRecord
(when using a batch-to-record adapter).
Rewriting the listener from
@KafkaListener(topics = {"a-topic"}, groupId = "a-group")
public void onMessage(List<ConsumerRecord<K,V>> records) {
// processing
}
to
@KafkaListener(topics = {"a-topic"}, groupId = "a-group")
public void onMessage(ConsumerRecords<K,V> records) {
for (var record : records) {
// processing
}
}
seems to fix the problem - span IDs are propagated correctly.