ApplicationInsights-Java
ApplicationInsights-Java copied to clipboard
Kafka consumer keeps using the same operationId when an exception is thrown
Expected behavior
When consuming events with kafka in Application Insights it should show java exceptions on the same level as the rest of the process. Just like this screenshot, but with an exception between the google calls:
Actual behavior
When the exception happens the trace tree is missing some parts and has incorrect indentation:
All event consumptions from other traces don't show up:
But they show up in the trace where the exception happened:
To Reproduce
Create a kafka consumer like this: azure/azure-event-hubs-for-kafka
Let that consumer throw an exception.
Sample Application
I set up a sample application in the following repo. It contains the issue details, a workaround and instructions to reproduce: appinsights-otel-kafka-eventhub
System information
- SDK Version: 3.2.11 and 3.3.0-BETA (commit 4e7fe08696)
- OS type and version: Docker - amd64 - eclipse-temurin:17-jre-alpine
- Using spring-boot: Yes
- Additional relevant libraries (from the pom.xml of the sample application): <applicationinsights-agent.version>3.2.11</applicationinsights-agent.version> <opentelemetry-api.version>1.13.0</opentelemetry-api.version> <kafka-clients.version>3.1.0</kafka-clients.version> <applicationinsights-spring-boot-starter.version>2.6.4</applicationinsights-spring-boot-starter.version>
Workaround
A workaround is that you could extract the "traceparent" header from the event inside the consumer and make this the current trace context.
After a restart the "good" traces now look like this. Mind the wrong indentation:
When the exceptions gets thrown the trace looks a bit better. The exception is out of place because the parent is missing:
The subsequent traces also look better, but they are missing the queue time information:
Polling without the workaround:
private void pollMessages(Consumer<Long, String> kafkaConsumer) {
try {
while (true) {
try {
final ConsumerRecords<Long, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord<Long, String> cr : consumerRecords) {
log.error("Consumer Record:({}, {}, {}, {})", cr.key(), cr.value(), cr.partition(), cr.offset());
log.info("Headers:");
for (Header header : cr.headers()) {
log.info("Key: {}, Value: {}", header.key(), new String(header.value()));
}
exampleEventConsumer.accept(cr.value());
}
kafkaConsumer.commitAsync();
} catch (Exception e) {
log.error("got some error: ", e);
}
}
} catch (CommitFailedException e) {
log.error("CommitFailedException", e);
} finally {
kafkaConsumer.close();
}
}
With the workaround:
private void pollMessages(Consumer<Long, String> kafkaConsumer) {
try {
while (true) {
try {
final ConsumerRecords<Long, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord<Long, String> cr : consumerRecords) {
log.error("Consumer Record:({}, {}, {}, {})", cr.key(), cr.value(), cr.partition(), cr.offset());
log.info("Headers:");
for (Header header : cr.headers()) {
log.info("Key: {}, Value: {}", header.key(), new String(header.value()));
}
// with workaround
Context context = getTraceParent(cr);
try (Scope scope = context.makeCurrent()) {
exampleEventConsumer.accept(cr.value());
}
}
kafkaConsumer.commitAsync();
} catch (Exception e) {
log.error("got some error: ", e);
}
}
} catch (CommitFailedException e) {
log.error("CommitFailedException", e);
} finally {
kafkaConsumer.close();
}
}
/**
* Get trace context from consumer record
*/
private Context getTraceParent(ConsumerRecord<Long, String> consumerRecord) {
var textMapGetter = new TextMapGetter<ConsumerRecord<Long, String>>() {
@Override
public Iterable<String> keys(ConsumerRecord carrier) {
return Collections.singleton("traceparent");
}
@Override
public String get(ConsumerRecord carrier, String key) {
Header header = carrier.headers().lastHeader(key);
return header == null ? null : new String(header.value(), StandardCharsets.UTF_8);
}
};
return W3CTraceContextPropagator
.getInstance()
.extract(Context.root(), consumerRecord, textMapGetter);
}
hi @Questlog, thanks for the detailed findings!
I've pulled in the fix from https://github.com/open-telemetry/opentelemetry-java-instrumentation/pull/6021 into the latest SNAPSHOT build, can you test this out and let usknow if it resolves the issue?
https://github.com/microsoft/ApplicationInsights-Java/suites/6531873089/artifacts/243546886
Hi @trask,
thanks for your help again. I've tested it in my sample application, the traces after the exception look fine now:
![grafik](https://user-images.githubusercontent.com/7374558/168767639-fdbf5f7d-3e78-496d-8de7-581d477ae051.png)
The trace containing the exception still looks odd though:
![grafik](https://user-images.githubusercontent.com/7374558/168767849-a611fc25-44d0-40fb-adf6-6f45cfc4ddc4.png)
The trace containing the exception still looks odd though
ya, I believe this is unavoidable with the pure auto-instrumentation approach, see https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947#issuecomment-1100276148
Hey, I'll close this issue. Versions above 3.4.3 are working fine now. I even don't see the wrong indentation any more.