opentelemetry-java-instrumentation
opentelemetry-java-instrumentation copied to clipboard
Instrumentation of Kafka Clients: Add a Span or an attribute to the current span
Describe the bug Really I am not sur if it is a Bug, I am working on instrumentation of Kafka Produce/Receive Message following this article: https://opentelemetry.io/blog/2022/instrument-kafka-clients/ I want to add attribute to the current Span.
Steps to reproduce I have test two methods to instrument kafka Client:
Interceptors: https://opentelemetry.io/blog/2022/instrument-kafka-clients/#using-the-interceptors Wrapping: https://opentelemetry.io/blog/2022/instrument-kafka-clients/#wrapping-the-clients
in two cases I use this code add attributes to current Span:
Span span = Span.current();
span.setAttribute("messaging.user", "My User");
producer.send(new ProducerRecord<>(topic, payload));
My objectif is to add attribute to the current Span
This code is working fine when I make REST HTTP Request but not in case of Kafka CLient
What did you expect to see? The attribute is added to the Span
What did you see instead? Nothing is added
What version are you using?
Environment Compiler: JDK 17 OS: MAC OS Runtime: JDK 17
Hey @axeaneProjects ,
The code that you posted won't work - the Kafka instrumentation creates a producer span inside the producer.send() call, so if you call Span.current() before that it's gonna return a different span (or no span at all).
You can configure the KafkaTelemetry to accept and extra AttributesExtractor that will be called when the producer span is constructed: https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/e8464f696082b2181e04d57cfc48a1e79ea70ac2/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetryBuilder.java#L38-L43
You can implement the AttributesExtractor interface yourself and set whatever attributes you want there.
@mateuszrzeszutek thank you for your response. By this way we can not add dynamically attributes
I have tested by adding the extractor before wrap, it is fine like this:
public void init() { producer = new KafkaProducer<>(props); kafkaTelemetryBuilder = KafkaTelemetry.builder(GlobalOpenTelemetry.get()); AttributeKey<String> MESSAGING_SYSTEM = AttributeKey.stringKey("messaging.issam"); PocAttributesExtractor pocAttributesExtractor = new PocAttributesExtractor(MESSAGING_SYSTEM, "issam"); kafkaTelemetryBuilder.addProducerAttributesExtractors(pocAttributesExtractor); KafkaTelemetry telemetry = kafkaTelemetryBuilder.build(); tracingProducer = telemetry.wrap(producer); }
public void sendMessage() {
for (int index = 0; index < 1; index++) {
sendKafkaMessage("The index is now: " + index, tracingProducer, kafkaTopicName);
}
}
private void sendKafkaMessage(String payload,
Producer<String, String> producer,
String topic) {
logger.info("Sending Kafka message: " + payload);
tracingProducer.send(new ProducerRecord<>(topic, payload));
}
Adding when I send message, the attribute is not added, here you are the code
public void init() {
producer = new KafkaProducer<>(props);
kafkaTelemetryBuilder = KafkaTelemetry.builder(GlobalOpenTelemetry.get());
KafkaTelemetry telemetry = kafkaTelemetryBuilder.build();
tracingProducer = telemetry.wrap(producer);
}
public void sendMessage() {
for (int index = 0; index < 1; index++) {
//Span span = Span.fromContext(Context.current());
AttributeKey<String> MESSAGING_SYSTEM = AttributeKey.stringKey("messaging.issam");
PocAttributesExtractor pocAttributesExtractor = new PocAttributesExtractor(MESSAGING_SYSTEM, "issam");
kafkaTelemetryBuilder.addProducerAttributesExtractors(pocAttributesExtractor);
sendKafkaMessage("The index is now: " + index, tracingProducer, kafkaTopicName);
}
}
@axeaneProjects extractor needs to be added before the KafkaTelemetry instance is built. If you need to pass information to the extractor you could use opentelemetry Context or perhaps add it in a header of ProducerRecord or ThreadLocal.
@mateuszrzeszutek thank you very much, it works with ThreadLocal.
Please I have other question, how I can add a Custom Span, other than produce and receive span.
You can use the opentelemetry sdk to create custom spans. See https://opentelemetry.io/docs/instrumentation/java/manual/#create-spans In your application add dependency to opentelemetry-api (you won't need other dependencies with the agent). When using with javaagent use GlobalOpenTelemetry.get() to get the OpenTelemetry instance.
Another convenient way for creating spans is with @WithSpan annotation, see https://opentelemetry.io/docs/instrumentation/java/automatic/annotations/
Hey @laurit I have tested this code using the documentation:
Tracer tracer = GlobalOpenTelemetry.get().getTracer("tracer");
Context parentContext = Context.current();
Span spanCustom = tracer.spanBuilder("poc (manual span)")
.setParent(parentContext) // Optional. The system automatically configures the settings.
.startSpan();
try (Scope scope = spanCustom.makeCurrent()) {
spanCustom.setAttribute("poc-id", "111");
} finally {
spanCustom.end();
}
OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();
openTelemetry.getPropagators();
I think that the context is not good because the Custom Span is created but not attached to the Produce/Receive Span.
Any help please
@axeaneProjects From the code you pasted it is not possible to discern why it should be connected with produce or receive span, please give a better sample.
@laurit sometimes I want to add functional span after the receive of message.
In cas of REST HTTP, every thing work fine but with kafka clients I can see only produce and receive span. IF I add custom span as showed in above code, the span is not attached to produce/receive span
Any help please
You have to start a span in scope of another one to connect them (that's not the only option, but that's the simplest one). E.g.
try (Scope scope = spanCustom.makeCurrent()) {
// ...
tracingProducer.send(new ProducerRecord<>(topic, payload));
// ...
} finally {
spanCustom.end();
}
will make a span that's a parent of the Kafka producer span. Normally, the consumer/receive side is analogous: you have to start a new span while the consumer span is active -- but the API exposed by the Apache Kafka client makes it impossible to do that with just library instrumentation (it works with the javaagent, because bytecode instrumentation allows us to do a bit more magic around that). If you want to have proper consumer traces, I'd recommend using something like Spring Kafka (for which we have a library instrumentation), or Vertx Kafka or Reactor Kafka (which do not have library instrumentations, but it'd be fairly easy to extract them).
The problem with receive is that the trace ends when the receive method exits. When you call Context.current() after the receive the context won't contain the receive span. As @mateuszrzeszutek mentioned it would be much easier with spring kafka where message is processed in a listener. There you'd have a process span that is active during the call and you could easily add child spans to it. If you wish to add a child span to the receive span you'll have to manually extract the context from the kafka message and use that context as the parent for your span.
Hey @axeaneProjects ,
The code that you posted won't work - the Kafka instrumentation creates a producer span inside the
producer.send()call, so if you callSpan.current()before that it's gonna return a different span (or no span at all).You can configure the
KafkaTelemetryto accept and extraAttributesExtractorthat will be called when the producer span is constructed:https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/e8464f696082b2181e04d57cfc48a1e79ea70ac2/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetryBuilder.java#L38-L43
You can implement the
AttributesExtractorinterface yourself and set whatever attributes you want there.
Hi @mateuszrzeszutek, could you please let me know if there is a way to use this approach (of adding extra AttributesExtractor) along with the interceptors?
Thank you.
Hey @adityasc94 ,
Unfortunately, currently there is no way to do so. Implementing #6291 would unblock that.
@mateuszrzeszutek
it works with the javaagent, because bytecode instrumentation allows us to do a bit more magic around that
Could you please help on how it can be done with java agent?