opentelemetry-java-instrumentation icon indicating copy to clipboard operation
opentelemetry-java-instrumentation copied to clipboard

Trace/span lost during stateful processing in Kafka streams

Open paulklos opened this issue 3 years ago • 3 comments

Discussed in https://github.com/open-telemetry/opentelemetry-java-instrumentation/discussions/6407

Originally posted by paulklos August 2, 2022 Hi,

We have an application that consumes messages from a Kafka topic. It's configured with Kafka bindings using Spring Cloud Stream.

One of our processes uses does an aggregation on a KStream, which results in a KTable, which is then converted back into a KStream and picked up by the next processor.

The whole process consists of 4 processors.

The application is instrumented with opentelemetry-javaagent.jar, the latest version.

Up until the aggregation we get the same trace id in our log messages, and a different span id in each processor, which is very nice, and exactly what we would expect.

But after the aggregation the span and trace ids disappear. Inspection in the debugger of the current span shows only zeros.

Is this a known limitation, or is there something we can configure to have proper propagation?

I created a demo project to illustrate the issue. It can be found here.

The README describes the steps to take.

paulklos avatar Sep 09 '22 15:09 paulklos

To the best of my knowledge, unfortunately, Kafka Streams doesn't store the record headers in the RocksDB store — it doesn't even allow it through the API.

There are use cases where the tracing info could be preserved through a stateful operation. Still, only if the stateful operation follows a non-stateful operation that carries tracing info, e.g I'm logging, then I'm storing this record, and after it, I'm sending this record forward to another topic/processor.

I don't know if it even makes sense to retrieve the tracing info from a Record that was stored :thinking: If the desired behavior you're looking for is to preserve the record header throughout the whole processing pipeline, even if this means dropping the tracing when storing it, but preserving the tracing when sending it to the changelog, then it makes perfect sense to me, and can be done — as I've done it using Brave instrumentation before.

LeoFuso avatar Nov 13 '22 19:11 LeoFuso

I observed the same when making stateful table-table joins. However, I can also see that other headers are preserved. When sending an input message with some headers, these headers are also present on the output message produced after my table-table join. So it seems to me, that somehow kafka-streams is already passing headers through the stateful join but the instrumentation does not capture them correctly.

m-kay avatar Mar 25 '24 06:03 m-kay

I just figured out that disabling the KTable record cache (statestore.cache.max.bytes=0) makes the instrumentation work correctly and my trace context is still present even after a stateful operation like a foreignkey-join.

@mateuszrzeszutek , @laurit Could you as the original creators of that insturmentation may have a look at this?

m-kay avatar May 23 '24 05:05 m-kay