brave
brave copied to clipboard
Tracing does not support Kafka Consumer interceptors
The poll method of the KafkaConsumer calls the interceptors before returning the result of the poll. https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1250
Due to this, it's not possible to have the tracing for the interceptors. See https://github.com/openzipkin/brave/blob/master/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java#L89
My current project currently defines some interceptors to perform automatic logging. But without this support, the interceptors are not logging the complete information (traceId, spanId, etc)
If you confirm the issue, I may create a Kafka issue to ask for additional methods on the consumer. Something like pollWithoutInterceptors and invokeInterceptors. Doing this would allow a review of the tracing implementation to support interceptors. Basically, we may invoke the interceptors later once the tracing would have be initialized in the poll method of the TracingConsumer.
Any ideas, other proposal or workaround to fix this issue would be more than welcome?
Thanks for your help.
Hi @frosiere, thanks for your feedback.
Not sure I understand what the issue is exactly, could you share some examples of logs and traces to get a clearer view of what is missing?
I also wonder if you give a try to https://github.com/openzipkin-contrib/brave-kafka-interceptor that already cover tracing within kafka interceptors.
I'm currently developing micro-services using Spring Boot, Spring Kafka and Spring Cloud Sleuth for distributed tracing.
At Kafka level, I defined a LoggingProducerInterceptor and a LoggingConsumerInterceptor to track both produced and consumed records. These interceptors are basically doing a logger.info(<RECORD>).
From a logging point of view, Sleuth automatically updates the log pattern to include the serviceName (+ traceId and spanId when available)
In case of a REST call, every log entries are decorated with the traceId and the spanId. Except for the LoggingConsumerInterceptor.
See a basic sample of log entries `2021-04-22 09:13:08.750 INFO [my-service,,] 48904 --- [ main] c.s.t.r.e.s.a.MyServiceApplication : Started MyServiceApplication in 12.839 seconds (JVM running for 14.909)
2021-04-22 09:13:44.522 INFO [my-service,39a65572d3018be5,39a65572d3018be5] 48904 --- [or-http-epoll-2] t.r.s.k.c.i.p.LoggingProducerInterceptor : Sent ProducerRecord(type=MyPojo, topic=my_topic, partition=null, headers={kafka_correlationId=9fc0e4c6-4b4b-3bd0-bb08-382d4ac344b7}, key=null, timestamp=null)
2021-04-22 09:13:44.583 INFO [my-service,,] 48904 --- [balStreamThread] t.r.s.k.c.i.c.LoggingConsumerInterceptor : Received ConsumerRecord(type=MyPojo, topic=my_topic, partition=0, leaderEpoch=0, offset=311, timestampType=CreateTime, serialized key size=102, serialized value size=124, headers={})`
This is due to the bug I previously described. As the interceptor is called in the poll method, the tracing is not yet ready when calling this interceptor. That's why I would suggest to spit the poll method in 2 methods. As you can see, no issue with the producer interceptor.
I can try using the interceptor you mentioned but not sure it will fix this issue.
Tested with the interceptor you mentioned. It didn't fix the issue. It also required to patch my code, it added more headers, etc This issue makes the consumer interceptors unusable in terms of tracing.
My feeling is that a review of both the KafkaConsumer and the TracingConsumer would be required to have an out of the box solution.
Any other ideas of suggestions? Thanks for your help.
Thanks for the details.
More than a limitation, this is by design (though not as straightforward to instrument) at the consumer polling mechanism: as each poll returns a bunch of records, the for each record the tracing context have to be extracted and make it available during processing. This is why interceptors only can "mark" that a record has been received, but doesn't do more to trace commits, etc. as these happen on a thread that is not traced. If users require more tracing details on the processing, then use the TracingConsumer directly.
The only solution I can envision at the moment for your use-case is to mix your logging and tracing interceptor in one instrumentation: At the onConsume
iterate records, and first extract the tracing context and then add your logging statements. This should allow all the tracing metadata to be available for logging.