opentelemetry-java-instrumentation icon indicating copy to clipboard operation
opentelemetry-java-instrumentation copied to clipboard

Instrumentation of Kafka Clients: Add a Span or an attribute to the current span

Open axeaneProjects opened this issue 2 years ago • 13 comments

Describe the bug Really I am not sur if it is a Bug, I am working on instrumentation of Kafka Produce/Receive Message following this article: https://opentelemetry.io/blog/2022/instrument-kafka-clients/ I want to add attribute to the current Span.

Steps to reproduce I have test two methods to instrument kafka Client:

Interceptors: https://opentelemetry.io/blog/2022/instrument-kafka-clients/#using-the-interceptors Wrapping: https://opentelemetry.io/blog/2022/instrument-kafka-clients/#wrapping-the-clients

in two cases I use this code add attributes to current Span:

    Span span = Span.current();
    span.setAttribute("messaging.user", "My User");
    producer.send(new ProducerRecord<>(topic, payload));

My objectif is to add attribute to the current Span

This code is working fine when I make REST HTTP Request but not in case of Kafka CLient

What did you expect to see? The attribute is added to the Span

What did you see instead? Nothing is added

What version are you using? <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> 3.5.1 <groupId>io.opentelemetry.instrumentation</groupId> <artifactId>opentelemetry-kafka-clients-2.6</artifactId> 1.21.0-alpha <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-exporter-otlp</artifactId> 1.21.0 <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId> 1.21.0-alpha

Environment Compiler: JDK 17 OS: MAC OS Runtime: JDK 17

axeaneProjects avatar Aug 17 '23 08:08 axeaneProjects

Hey @axeaneProjects ,

The code that you posted won't work - the Kafka instrumentation creates a producer span inside the producer.send() call, so if you call Span.current() before that it's gonna return a different span (or no span at all).

You can configure the KafkaTelemetry to accept and extra AttributesExtractor that will be called when the producer span is constructed: https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/e8464f696082b2181e04d57cfc48a1e79ea70ac2/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetryBuilder.java#L38-L43 You can implement the AttributesExtractor interface yourself and set whatever attributes you want there.

mateuszrzeszutek avatar Aug 17 '23 14:08 mateuszrzeszutek

@mateuszrzeszutek thank you for your response. By this way we can not add dynamically attributes

I have tested by adding the extractor before wrap, it is fine like this:

public void init() { producer = new KafkaProducer<>(props); kafkaTelemetryBuilder = KafkaTelemetry.builder(GlobalOpenTelemetry.get()); AttributeKey<String> MESSAGING_SYSTEM = AttributeKey.stringKey("messaging.issam"); PocAttributesExtractor pocAttributesExtractor = new PocAttributesExtractor(MESSAGING_SYSTEM, "issam"); kafkaTelemetryBuilder.addProducerAttributesExtractors(pocAttributesExtractor); KafkaTelemetry telemetry = kafkaTelemetryBuilder.build(); tracingProducer = telemetry.wrap(producer); }

public void sendMessage() {
    for (int index = 0; index < 1; index++) {
        sendKafkaMessage("The index is now: " + index, tracingProducer, kafkaTopicName);
    }
}

private  void sendKafkaMessage(String payload,
                                     Producer<String, String> producer,
                                     String topic)  {

    logger.info("Sending Kafka message: " + payload);
    tracingProducer.send(new ProducerRecord<>(topic, payload));
}

Adding when I send message, the attribute is not added, here you are the code

public void init() {
    producer = new KafkaProducer<>(props);
    kafkaTelemetryBuilder =  KafkaTelemetry.builder(GlobalOpenTelemetry.get());
    KafkaTelemetry telemetry = kafkaTelemetryBuilder.build();
    tracingProducer = telemetry.wrap(producer);
}

public void sendMessage() {
    for (int index = 0; index < 1; index++) {
        //Span span = Span.fromContext(Context.current());
        AttributeKey<String> MESSAGING_SYSTEM = AttributeKey.stringKey("messaging.issam");
        PocAttributesExtractor pocAttributesExtractor = new PocAttributesExtractor(MESSAGING_SYSTEM, "issam");
        kafkaTelemetryBuilder.addProducerAttributesExtractors(pocAttributesExtractor);
        sendKafkaMessage("The index is now: " + index, tracingProducer, kafkaTopicName);
    }
}

axeaneProjects avatar Aug 17 '23 15:08 axeaneProjects

@axeaneProjects extractor needs to be added before the KafkaTelemetry instance is built. If you need to pass information to the extractor you could use opentelemetry Context or perhaps add it in a header of ProducerRecord or ThreadLocal.

laurit avatar Aug 17 '23 17:08 laurit

@mateuszrzeszutek thank you very much, it works with ThreadLocal.

Please I have other question, how I can add a Custom Span, other than produce and receive span.

axeaneProjects avatar Aug 17 '23 18:08 axeaneProjects

You can use the opentelemetry sdk to create custom spans. See https://opentelemetry.io/docs/instrumentation/java/manual/#create-spans In your application add dependency to opentelemetry-api (you won't need other dependencies with the agent). When using with javaagent use GlobalOpenTelemetry.get() to get the OpenTelemetry instance. Another convenient way for creating spans is with @WithSpan annotation, see https://opentelemetry.io/docs/instrumentation/java/automatic/annotations/

laurit avatar Aug 18 '23 05:08 laurit

Hey @laurit I have tested this code using the documentation:

      Tracer tracer = GlobalOpenTelemetry.get().getTracer("tracer");
        Context parentContext = Context.current();

        Span spanCustom = tracer.spanBuilder("poc (manual span)")
                .setParent(parentContext) // Optional. The system automatically configures the settings.
                .startSpan();

        try (Scope scope = spanCustom.makeCurrent()) {
            spanCustom.setAttribute("poc-id", "111");
        } finally {
            spanCustom.end();
        }
        OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();
        openTelemetry.getPropagators();

I think that the context is not good because the Custom Span is created but not attached to the Produce/Receive Span.

Any help please

axeaneProjects avatar Aug 18 '23 08:08 axeaneProjects

@axeaneProjects From the code you pasted it is not possible to discern why it should be connected with produce or receive span, please give a better sample.

laurit avatar Aug 18 '23 08:08 laurit

@laurit sometimes I want to add functional span after the receive of message.

In cas of REST HTTP, every thing work fine but with kafka clients I can see only produce and receive span. IF I add custom span as showed in above code, the span is not attached to produce/receive span

Any help please

axeaneProjects avatar Aug 18 '23 10:08 axeaneProjects

You have to start a span in scope of another one to connect them (that's not the only option, but that's the simplest one). E.g.

try (Scope scope = spanCustom.makeCurrent()) {
    // ...
    tracingProducer.send(new ProducerRecord<>(topic, payload));
    // ...
} finally {
    spanCustom.end();
}

will make a span that's a parent of the Kafka producer span. Normally, the consumer/receive side is analogous: you have to start a new span while the consumer span is active -- but the API exposed by the Apache Kafka client makes it impossible to do that with just library instrumentation (it works with the javaagent, because bytecode instrumentation allows us to do a bit more magic around that). If you want to have proper consumer traces, I'd recommend using something like Spring Kafka (for which we have a library instrumentation), or Vertx Kafka or Reactor Kafka (which do not have library instrumentations, but it'd be fairly easy to extract them).

mateuszrzeszutek avatar Aug 18 '23 10:08 mateuszrzeszutek

The problem with receive is that the trace ends when the receive method exits. When you call Context.current() after the receive the context won't contain the receive span. As @mateuszrzeszutek mentioned it would be much easier with spring kafka where message is processed in a listener. There you'd have a process span that is active during the call and you could easily add child spans to it. If you wish to add a child span to the receive span you'll have to manually extract the context from the kafka message and use that context as the parent for your span.

laurit avatar Aug 18 '23 10:08 laurit

Hey @axeaneProjects ,

The code that you posted won't work - the Kafka instrumentation creates a producer span inside the producer.send() call, so if you call Span.current() before that it's gonna return a different span (or no span at all).

You can configure the KafkaTelemetry to accept and extra AttributesExtractor that will be called when the producer span is constructed:

https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/e8464f696082b2181e04d57cfc48a1e79ea70ac2/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetryBuilder.java#L38-L43

You can implement the AttributesExtractor interface yourself and set whatever attributes you want there.

Hi @mateuszrzeszutek, could you please let me know if there is a way to use this approach (of adding extra AttributesExtractor) along with the interceptors?

Thank you.

adityasc94 avatar Sep 20 '23 15:09 adityasc94

Hey @adityasc94 ,

Unfortunately, currently there is no way to do so. Implementing #6291 would unblock that.

mateuszrzeszutek avatar Sep 21 '23 15:09 mateuszrzeszutek

@mateuszrzeszutek

it works with the javaagent, because bytecode instrumentation allows us to do a bit more magic around that

Could you please help on how it can be done with java agent?

kokikathir avatar Apr 29 '24 04:04 kokikathir