spring-cloud-sleuth icon indicating copy to clipboard operation
spring-cloud-sleuth copied to clipboard

Spring Cloud Sleuth not propagating b3 traces from kafka records via Kafka Stream Binder

Open richardkabiling opened this issue 3 years ago • 6 comments

Issue Description Logging in spring-cloud-stream kafka-streams-binder does not log traces. It seems b3 traces are not picked up from the record in the topic.

Steps to replicate

  1. Bring up environment
docker-compose up -d
./gradlew clean bootRun

In a separate console, tail topic. If this step fails due to non-existent topic, please perform step 2 once to trigger topic auto-creation first.

docker-compose exec broker kafka-console-consumer --bootstrap-server localhost:9092 --topic resources --property print.key=true --property print.headers=true --property print.timestamp=true
  1. Emitted the following data into a resources topic (via HTTP -> Producer -> topic)
curl -X PUT 'localhost:25400/namespaces/a/resources/3' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "C"
}'
  1. Confirm in application logs that b3 traces are generated/propagated
2021-05-20 10:14:45.103  INFO [,bb4e9443b6bcd085,bb4e9443b6bcd085] 7211 --- [ctor-http-nio-3] c.e.p.kafka.sleuth.ResourcesController   : Received request to map resource. namespace=a, resourceId=3, request=MapResourceRequest(name=C)
  1. Confirm record in topic if b3 traces are propagated
CreateTime:1621476885103        b3:bb4e9443b6bcd085-c06a30a6b67ae651-0,__TypeId__:com.example.poc.kafka.sleuth.Resource a:3     {"namespace":"a","id":"3","name":"C"}
  1. Spring cloud stream kafka binder is able to pick-up record, but not the b3 traces
2021-05-20 10:14:45.132  INFO [,,] 7211 --- [-StreamThread-1] c.e.poc.kafka.sleuth.ProcessingConfig    : Received to materialize resource. a:3=Resource(namespace=a, id=3, name=C)
extra["springCloudVersion"] = "2020.0.2"
...
dependencies {
        ...
	implementation("org.springframework.cloud:spring-cloud-stream")
	implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
	implementation("org.springframework.cloud:spring-cloud-starter-sleuth")
        ...
}
spring.cloud.stream.function.definition=materializeResources
spring.cloud.stream.kafka.streams.binder.functions.materializeResources.application-id=resourceMaterializerProcessor
spring.cloud.stream.bindings.materializeResources-in-0.destination=resources
...
spring.sleuth.messaging.kafka.enabled=true
spring.sleuth.messaging.kafka.streams.enabled=true
    @Bean
    fun materializeResources(resourceSerde: JsonSerde<Resource>): Consumer<KStream<String, Resource>> {
        return Consumer {
            it.peek { key, value -> logger.info { "Received to materialize resource. $key=$value" } }
                .toTable(Materialized.`as`<String, Resource, KeyValueStore<Bytes, ByteArray>>("resources-store")
                    .withKeySerde(Serdes.StringSerde())
                    .withValueSerde(resourceSerde))
        }

Sample

  1. Please clone: https://github.com/richardkabiling/kafka-sleuth (for java maven sample: clone https://github.com/richardkabiling/kafka-sleuth-maven-java instead)
  2. Follow instructions above

richardkabiling avatar May 20 '21 02:05 richardkabiling

Hi! Would it be a big ask for you to rewrite this project to Maven and Java? I have constant issues with Gradle and Kotlin with Intellij :cry:

marcingrzejszczak avatar May 20 '21 07:05 marcingrzejszczak

I can probably try regenerating from initializer. Please give me some time to rewrite.

richardkabiling avatar May 20 '21 07:05 richardkabiling

@marcingrzejszczak - rewrote to java and maven. Please try cloning this instead: https://github.com/richardkabiling/kafka-sleuth-maven-java

Also updated issue. For the most part behavior is still the same:

kafka-console-consumer:

CreateTime:1621530585201        b3:bf864af90968acfb-e9a8f91c1303ecb9-0,__TypeId__:com.example.poc.kafka.sleuth.Resource a:e     {"namespace":"a","id":"e","name":"5"}

application logs:

2021-05-21 01:09:45.188  INFO [,bf864af90968acfb,bf864af90968acfb] 25280 --- [or-http-epoll-2] c.e.poc.kafka.sleuth.ResourceController  : Received request to map resource. namespace=a, resourceId=e. request=MapIssuerRequest(name=5)
...
2021-05-21 01:09:45.213  INFO [,,] 25280 --- [-StreamThread-1] c.e.poc.kafka.sleuth.ProcessingConfig    : Received request to materialize resource. key=a:e, value=Resource(namespace=a, id=e, name=5)

trace ID (bf864af90968acfb) appeared in both the Controller logs and the kafka console consumer but not in the logs by the ProcessingConfig.

richardkabiling avatar May 20 '21 17:05 richardkabiling

I think that this can be related https://github.com/spring-cloud/spring-cloud-sleuth/issues/1965

marcingrzejszczak avatar Jul 26 '21 13:07 marcingrzejszczak

We would need help with documenting the workaround for #1965. Anybody is interested in helping us out?

marcingrzejszczak avatar Oct 04 '21 09:10 marcingrzejszczak

Hi guys, I recently faced the same problem and ReactorSleuth.tracedMono wrapper usage works fine for me. All the nested .flatMap calls log provided trace. Here is an example, may be it'll help you:

class KafkaListener(
    private val consumerTemplate: ReactiveKafkaConsumerTemplate<String, String>,
    private val kafkaProcessor: KafkaMessageHandler,
    private val tracer: Tracer,
    private val propagator: Propagator
) : DisposableBean {

    private lateinit var disposable: Disposable

    @PostConstruct
    fun startConsuming() {
        disposable = consumerTemplate
            .receiveAutoAck()
            .doOnNext { log.debug { "Received $it" } }
            .flatMap { record ->

                val spanBuilder = propagator.extract(record, TracingKafkaPropagatorGetter())
                val span = spanBuilder.start()

                ReactorSleuth.tracedMono(tracer, span) {
                    kafkaProcessor.process(record)
                        .onErrorResume { Mono.just(false) }
                }
            }
            .subscribe { log.debug { "Handled, success: $it" } }
    }

    override fun destroy() = disposable.dispose()

}

MisterRnobe avatar Aug 16 '22 11:08 MisterRnobe

Please upgrade to Micrometer Tracing. Spring Cloud Sleuth is feature complete and out of OSS support.

marcingrzejszczak avatar Feb 09 '24 13:02 marcingrzejszczak