ApplicationInsights-Java icon indicating copy to clipboard operation
ApplicationInsights-Java copied to clipboard

Kafka consumer keeps using the same operationId when an exception is thrown

Open Questlog opened this issue 2 years ago • 3 comments

Expected behavior

When consuming events with kafka in Application Insights it should show java exceptions on the same level as the rest of the process. Just like this screenshot, but with an exception between the google calls:

grafik

Actual behavior

When the exception happens the trace tree is missing some parts and has incorrect indentation:

grafik

All event consumptions from other traces don't show up:

grafik

But they show up in the trace where the exception happened:

grafik

To Reproduce

Create a kafka consumer like this: azure/azure-event-hubs-for-kafka

Let that consumer throw an exception.

Sample Application

I set up a sample application in the following repo. It contains the issue details, a workaround and instructions to reproduce: appinsights-otel-kafka-eventhub

System information

  • SDK Version: 3.2.11 and 3.3.0-BETA (commit 4e7fe08696)
  • OS type and version: Docker - amd64 - eclipse-temurin:17-jre-alpine
  • Using spring-boot: Yes
  • Additional relevant libraries (from the pom.xml of the sample application): <applicationinsights-agent.version>3.2.11</applicationinsights-agent.version> <opentelemetry-api.version>1.13.0</opentelemetry-api.version> <kafka-clients.version>3.1.0</kafka-clients.version> <applicationinsights-spring-boot-starter.version>2.6.4</applicationinsights-spring-boot-starter.version>

Workaround

A workaround is that you could extract the "traceparent" header from the event inside the consumer and make this the current trace context.

After a restart the "good" traces now look like this. Mind the wrong indentation:

grafik

When the exceptions gets thrown the trace looks a bit better. The exception is out of place because the parent is missing:

grafik

The subsequent traces also look better, but they are missing the queue time information:

grafik

Polling without the workaround:

  private void pollMessages(Consumer<Long, String> kafkaConsumer) {
    try {
      while (true) {
        try {
          final ConsumerRecords<Long, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
          for(ConsumerRecord<Long, String> cr : consumerRecords) {
            log.error("Consumer Record:({}, {}, {}, {})", cr.key(), cr.value(), cr.partition(), cr.offset());

            log.info("Headers:");
            for (Header header : cr.headers()) {
              log.info("Key: {}, Value: {}", header.key(), new String(header.value()));
            }
            
            exampleEventConsumer.accept(cr.value());
          }
          kafkaConsumer.commitAsync();
        } catch (Exception e) {
          log.error("got some error: ", e);
        }
      }
    } catch (CommitFailedException e) {
      log.error("CommitFailedException", e);
    } finally {
      kafkaConsumer.close();
    }
  }

With the workaround:

  private void pollMessages(Consumer<Long, String> kafkaConsumer) {
    try {
      while (true) {
        try {
          final ConsumerRecords<Long, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
          for(ConsumerRecord<Long, String> cr : consumerRecords) {
            log.error("Consumer Record:({}, {}, {}, {})", cr.key(), cr.value(), cr.partition(), cr.offset());

            log.info("Headers:");
            for (Header header : cr.headers()) {
              log.info("Key: {}, Value: {}", header.key(), new String(header.value()));
            }
            // with workaround
            Context context = getTraceParent(cr);
            try (Scope scope = context.makeCurrent()) {
              exampleEventConsumer.accept(cr.value());
            }
          }
          kafkaConsumer.commitAsync();
        } catch (Exception e) {
          log.error("got some error: ", e);
        }
      }
    } catch (CommitFailedException e) {
      log.error("CommitFailedException", e);
    } finally {
      kafkaConsumer.close();
    }
  }

  /**
   * Get trace context from consumer record
   */
  private Context getTraceParent(ConsumerRecord<Long, String> consumerRecord) {
    var textMapGetter = new TextMapGetter<ConsumerRecord<Long, String>>() {
      @Override
      public Iterable<String> keys(ConsumerRecord carrier) {
        return Collections.singleton("traceparent");
      }

      @Override
      public String get(ConsumerRecord carrier, String key) {
        Header header = carrier.headers().lastHeader(key);
        return header == null ? null : new String(header.value(), StandardCharsets.UTF_8);
      }
    };

    return W3CTraceContextPropagator
        .getInstance()
        .extract(Context.root(), consumerRecord, textMapGetter);
  }

Questlog avatar May 16 '22 08:05 Questlog

hi @Questlog, thanks for the detailed findings!

I've pulled in the fix from https://github.com/open-telemetry/opentelemetry-java-instrumentation/pull/6021 into the latest SNAPSHOT build, can you test this out and let usknow if it resolves the issue?

https://github.com/microsoft/ApplicationInsights-Java/suites/6531873089/artifacts/243546886

trask avatar May 17 '22 03:05 trask

Hi @trask,

thanks for your help again. I've tested it in my sample application, the traces after the exception look fine now:

grafik

The trace containing the exception still looks odd though:

grafik

Questlog avatar May 17 '22 08:05 Questlog

The trace containing the exception still looks odd though

ya, I believe this is unavoidable with the pure auto-instrumentation approach, see https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947#issuecomment-1100276148

trask avatar May 18 '22 02:05 trask

Hey, I'll close this issue. Versions above 3.4.3 are working fine now. I even don't see the wrong indentation any more.

Questlog avatar Jan 11 '23 15:01 Questlog