java-kafka-client icon indicating copy to clipboard operation
java-kafka-client copied to clipboard

Custom span names for consumers doesn't seem to work

Open raphaeldore opened this issue 4 years ago • 2 comments
trafficstars

Hello,

I'm using opentracing-kafka-spring version 0.1.15. I'm trying to set custom span names for producers and consumers. Making it work for producers was very easy and worked immediately, and I assumed it would be the same for consumers, but alas it was not.

After a little bit of investigating, I found that buildAndFinishChildSpan is never called (https://github.com/opentracing-contrib/java-kafka-client/blob/0.1.15/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaUtils.java#L115-L139). Spans are still being created though, I can clearly see them in the logs, but they appear as KafkaListener_xxx, which is not what I want.

Also, it's not impossible that there is a concept that I don't understand properly :).

Here is how I configure the library:

package com.mycompany;

import com.mycompany.common.eventhandler.eventmessage.EventMessage;
import io.opentracing.Tracer;
import io.opentracing.contrib.kafka.spring.TracingConsumerFactory;
import io.opentracing.contrib.kafka.spring.TracingKafkaAspect;
import io.opentracing.contrib.kafka.spring.TracingProducerFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.*;

import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;

@Configuration
public class KafkaTracingConfig {

    private final KafkaProperties kafkaProperties;
    private final Tracer tracer;

    // Sets the span name to the event name
    private final BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider =
           (operationName, consumerRecord) -> "CONSUME_" + ((EventMessage<?>)consumerRecord.value()).getName();

    private final BiFunction<String, ProducerRecord, String> producerSpanNameProvider =
            (operationName, producerRecord) -> "PRODUCE_" + ((EventMessage<?>)producerRecord.value()).getName();

    public KafkaTracingConfig(KafkaProperties kafkaProperties, Tracer tracer) {
        this.kafkaProperties = kafkaProperties;
        this.tracer = tracer;
    }

    @Bean
    public ConsumerFactory<String, EventMessage<?>> consumerFactory() {
        return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(consumerProps()), tracer, consumerSpanNameProvider);
    }

    @Bean
    public ProducerFactory<String, EventMessage<?>> producerFactory() {
        return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(producerProps()), tracer, producerSpanNameProvider);
    }

    @Bean
    public KafkaTemplate<String, EventMessage<?>> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public TracingKafkaAspect tracingKafkaAspect() {
        return new TracingKafkaAspect(tracer);
    }

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>(kafkaProperties.buildConsumerProperties());
        return props;
    }

    private Map<String, Object> producerProps() {
        Map<String, Object> props = new HashMap<>(kafkaProperties.buildProducerProperties());
        return props;
    }
}

And here is a simplified version of one of our consumers:

package com.mycompany.eventhandler.consumer;

import com.mycompany.common.eventhandler.eventmessage.EventMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class StationReservationConsumer {

    @KafkaListener(topics = {"myTopic"}, id = "anId")
    public void consumeMyTopic(@Payload EventMessage<?> eventMessage, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.OFFSET) int offset, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        // blablabla
    }

    @KafkaHandler(isDefault = true)
    public void unknown(Object object) {
        log.info("Kafka reading unknown: " + object);
    }
}

Thanks and have a nice day !

raphaeldore avatar Jan 20 '21 16:01 raphaeldore

I will also say that SpanDecorators are also not applied for consumers (which is probably a consequence of not calling buildAndFinishChildSpan).

raphaeldore avatar Jan 20 '21 21:01 raphaeldore

@raphaeldore Did you finally solve the problem? I also encountered the same problem

wushuangxiaoyu avatar Sep 09 '22 09:09 wushuangxiaoyu