spring-cloud-sleuth
spring-cloud-sleuth copied to clipboard
Spring Cloud Sleuth not propagating b3 traces from kafka records via Kafka Stream Binder
Issue Description Logging in spring-cloud-stream kafka-streams-binder does not log traces. It seems b3 traces are not picked up from the record in the topic.
Steps to replicate
- Bring up environment
docker-compose up -d
./gradlew clean bootRun
In a separate console, tail topic. If this step fails due to non-existent topic, please perform step 2 once to trigger topic auto-creation first.
docker-compose exec broker kafka-console-consumer --bootstrap-server localhost:9092 --topic resources --property print.key=true --property print.headers=true --property print.timestamp=true
- Emitted the following data into a
resources
topic (via HTTP -> Producer -> topic)
curl -X PUT 'localhost:25400/namespaces/a/resources/3' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "C"
}'
- Confirm in application logs that b3 traces are generated/propagated
2021-05-20 10:14:45.103 INFO [,bb4e9443b6bcd085,bb4e9443b6bcd085] 7211 --- [ctor-http-nio-3] c.e.p.kafka.sleuth.ResourcesController : Received request to map resource. namespace=a, resourceId=3, request=MapResourceRequest(name=C)
- Confirm record in topic if b3 traces are propagated
CreateTime:1621476885103 b3:bb4e9443b6bcd085-c06a30a6b67ae651-0,__TypeId__:com.example.poc.kafka.sleuth.Resource a:3 {"namespace":"a","id":"3","name":"C"}
- Spring cloud stream kafka binder is able to pick-up record, but not the b3 traces
2021-05-20 10:14:45.132 INFO [,,] 7211 --- [-StreamThread-1] c.e.poc.kafka.sleuth.ProcessingConfig : Received to materialize resource. a:3=Resource(namespace=a, id=3, name=C)
extra["springCloudVersion"] = "2020.0.2"
...
dependencies {
...
implementation("org.springframework.cloud:spring-cloud-stream")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
implementation("org.springframework.cloud:spring-cloud-starter-sleuth")
...
}
spring.cloud.stream.function.definition=materializeResources
spring.cloud.stream.kafka.streams.binder.functions.materializeResources.application-id=resourceMaterializerProcessor
spring.cloud.stream.bindings.materializeResources-in-0.destination=resources
...
spring.sleuth.messaging.kafka.enabled=true
spring.sleuth.messaging.kafka.streams.enabled=true
@Bean
fun materializeResources(resourceSerde: JsonSerde<Resource>): Consumer<KStream<String, Resource>> {
return Consumer {
it.peek { key, value -> logger.info { "Received to materialize resource. $key=$value" } }
.toTable(Materialized.`as`<String, Resource, KeyValueStore<Bytes, ByteArray>>("resources-store")
.withKeySerde(Serdes.StringSerde())
.withValueSerde(resourceSerde))
}
Sample
- Please clone: https://github.com/richardkabiling/kafka-sleuth (for java maven sample: clone https://github.com/richardkabiling/kafka-sleuth-maven-java instead)
- Follow instructions above
Hi! Would it be a big ask for you to rewrite this project to Maven and Java? I have constant issues with Gradle and Kotlin with Intellij :cry:
I can probably try regenerating from initializer. Please give me some time to rewrite.
@marcingrzejszczak - rewrote to java and maven. Please try cloning this instead: https://github.com/richardkabiling/kafka-sleuth-maven-java
Also updated issue. For the most part behavior is still the same:
kafka-console-consumer:
CreateTime:1621530585201 b3:bf864af90968acfb-e9a8f91c1303ecb9-0,__TypeId__:com.example.poc.kafka.sleuth.Resource a:e {"namespace":"a","id":"e","name":"5"}
application logs:
2021-05-21 01:09:45.188 INFO [,bf864af90968acfb,bf864af90968acfb] 25280 --- [or-http-epoll-2] c.e.poc.kafka.sleuth.ResourceController : Received request to map resource. namespace=a, resourceId=e. request=MapIssuerRequest(name=5)
...
2021-05-21 01:09:45.213 INFO [,,] 25280 --- [-StreamThread-1] c.e.poc.kafka.sleuth.ProcessingConfig : Received request to materialize resource. key=a:e, value=Resource(namespace=a, id=e, name=5)
trace ID (bf864af90968acfb) appeared in both the Controller logs and the kafka console consumer but not in the logs by the ProcessingConfig.
I think that this can be related https://github.com/spring-cloud/spring-cloud-sleuth/issues/1965
We would need help with documenting the workaround for #1965. Anybody is interested in helping us out?
Hi guys, I recently faced the same problem and ReactorSleuth.tracedMono
wrapper usage works fine for me. All the nested .flatMap
calls log provided trace. Here is an example, may be it'll help you:
class KafkaListener(
private val consumerTemplate: ReactiveKafkaConsumerTemplate<String, String>,
private val kafkaProcessor: KafkaMessageHandler,
private val tracer: Tracer,
private val propagator: Propagator
) : DisposableBean {
private lateinit var disposable: Disposable
@PostConstruct
fun startConsuming() {
disposable = consumerTemplate
.receiveAutoAck()
.doOnNext { log.debug { "Received $it" } }
.flatMap { record ->
val spanBuilder = propagator.extract(record, TracingKafkaPropagatorGetter())
val span = spanBuilder.start()
ReactorSleuth.tracedMono(tracer, span) {
kafkaProcessor.process(record)
.onErrorResume { Mono.just(false) }
}
}
.subscribe { log.debug { "Handled, success: $it" } }
}
override fun destroy() = disposable.dispose()
}
Please upgrade to Micrometer Tracing. Spring Cloud Sleuth is feature complete and out of OSS support.