spring-cloud-sleuth icon indicating copy to clipboard operation
spring-cloud-sleuth copied to clipboard

Use kafka hooks to implement tracing that will not lose traceId on error handling

Open MaksimMyshkin opened this issue 3 years ago • 4 comments

Is your feature request related to a problem? Please describe. Problem was described in https://github.com/spring-cloud/spring-cloud-sleuth/issues/1659 (but issue was closed)

Describe the solution you'd like In issue spring-projects/spring-kafka#1704 was implemented additional hook for batch and record interceptors that triggering after error handling. I think Spring Sleuth can use this interceptors to implement logic that will not clear tracing context before error handling.

Describe alternatives you've considered Look like in current AOP solution there is no opportunity to extend tracing scope as it wraps only main logic (without error handling).

MaksimMyshkin avatar Jan 04 '22 08:01 MaksimMyshkin

I am facing the same problem. Will this enhancement be taken into consideration or not?

afzalex avatar Jun 26 '22 09:06 afzalex

@jonatan-ivanov can you look into this since you were working on the other issues too?

marcingrzejszczak avatar Jun 28 '22 11:06 marcingrzejszczak

I'm waiting for this bug to be fixed, meanwhile for anyone who is looking for a workaround, this might be useful.

1- At consumer:

    private final Tracing tracing;
    //....

    @KafkaListener//....
    public void listen(ConsumerRecord<String, Event> record) {
        Injector<Headers> injector = tracing.propagation()
                .injector((headers, s, s2) -> headers.add(new RecordHeader(s, s2.getBytes())));
        injector.inject(tracing.currentTraceContext().get(), record.headers());
        //....
    }

2- At the error handler:

    private final Tracing tracing;
    //....

    @Override
    public void handle(Exception ex, ConsumerRecord<?, ?> record) {
        Extractor<Headers> extractor = tracing.propagation()
                .extractor((request, key) -> new String(request.lastHeader(key).value()));
        Span span = tracing.tracer().joinSpan(extractor.extract(record.headers()).context());
        try (SpanInScope ignored = tracing.tracer().withSpanInScope(span.start())) {
            handleError(record, ex);
        } finally {
            span.finish();
        }
    }

    private void handleError(ConsumerRecord<?, ?> record, Exception ex) {
        ....
    }

ahmedsaleh747 avatar Aug 30 '22 05:08 ahmedsaleh747

With spring-kafka 2.8+ it can be implemented simply as:

class TracingRecordInterceptor(
    private val kafkaTracing: KafkaTracing,
    private val tracer: Tracer,
) : ConsumerAwareRecordInterceptor<String, Any> {

    private var span: ThreadLocal<Span?> = ThreadLocal<Span?>()
    private var spanInScope: ThreadLocal<SpanInScope?> = ThreadLocal<SpanInScope?>()

    override fun intercept(
        record: ConsumerRecord<String, Any>,
        consumer: Consumer<String, Any>
    ): ConsumerRecord<String, Any> {
        span.set(kafkaTracing.nextSpan(record).name("on-message").start())
        spanInScope.set(tracer.withSpanInScope(span.get()))
        return record
    }

    override fun failure(record: ConsumerRecord<String, Any>, exception: Exception, consumer: Consumer<String, Any>) {
        var message = exception.message
        if (message == null) {
            message = exception.javaClass.simpleName
        }
        span.get()?.tag("error", message)
    }

    override fun afterRecord(record: ConsumerRecord<String, Any>, consumer: Consumer<String, Any>) {
        spanInScope.get()?.close()
        span.get()?.finish()

        spanInScope.remove()
        span.remove()
    }
}

Key moment is afterRecord() method that was added recently. It triggering after error handler logic and for this reason should not lose trace. Sadly we are not tried it yet on our project as we not yet upgraded our dependencies. Also I don't know yet how to specify key and values types more universally.

On our project we use similar implementation (but without new afterRecord method). Rewriting tracing logic this way allowed us to specify different tracing configs for different consumers unlike default aspect solution that enables tracing for all consumers. I see this as another reason why this way of implementation is better.

MaksimMyshkin avatar Sep 02 '22 15:09 MaksimMyshkin

@MaksimMyshkin i tried using the above approach but the trace id's are same for the records in the same poll, were you able to implement this approach with kafka version bump

mohit-kaul-kr avatar Nov 09 '23 12:11 mohit-kaul-kr

No, not tried it yet.

For old spring-kafka versions we additionally set errorHandler in @KafkaListener annotations. This kind of error handlers are executing within tracing scope. However this solution is requires to support additoinal error handler kind.

MaksimMyshkin avatar Nov 13 '23 07:11 MaksimMyshkin

fyi: instrumentation was moved to Spring Kafka a year ago. Could you please try to upgrade? It should not be an issue with it. Fyi: Spring Boot 2.x and Spring Framework 5.x that you are probably using will go out of OSS support in about a week: https://spring.io/projects/spring-boot#support

jonatan-ivanov avatar Nov 15 '23 17:11 jonatan-ivanov

Please upgrade to Micrometer Tracing. Spring Cloud Sleuth is feature complete and out of OSS support.

marcingrzejszczak avatar Feb 09 '24 13:02 marcingrzejszczak