When using KafkaMessageDrivenChannelAdapter with tracing it fails
Describe the bug
when using kafka and KafkaMessageDrivenChannelAdapter with tracing propagation, it fails.. because in micrometer it tries to translate b3 header to String.class..
stacktrace
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2946)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2887)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2854)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$57(KafkaMessageListenerContainer.java:2772)
at io.micrometer.observation.Observation.observe(Observation.java:562)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2770)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2622)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2508)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2150)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1505)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1469)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1344)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalArgumentException: Incorrect type specified for header 'b3'. Expected [class java.lang.String] but actual type is [class [B]
at org.springframework.messaging.MessageHeaders.get(MessageHeaders.java:216)
at org.springframework.integration.support.management.observation.MessageReceiverContext.lambda$new$0(MessageReceiverContext.java:38)
at io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler.lambda$onStart$0(PropagatingReceiverTracingObservationHandler.java:59)
at io.micrometer.tracing.otel.bridge.OtelPropagator$1.get(OtelPropagator.java:73)
at io.opentelemetry.extension.trace.propagation.B3PropagatorExtractorSingleHeader.extractSpanContextFromSingleHeader(B3PropagatorExtractorSingleHeader.java:36)
at io.opentelemetry.extension.trace.propagation.B3PropagatorExtractorSingleHeader.extract(B3PropagatorExtractorSingleHeader.java:31)
at io.opentelemetry.extension.trace.propagation.B3Propagator.lambda$extract$0(B3Propagator.java:117)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.Spliterators$ArraySpliterator.tryAdvance(Spliterators.java:1002)
at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:647)
at io.opentelemetry.extension.trace.propagation.B3Propagator.extract(B3Propagator.java:123)
at org.springframework.boot.actuate.autoconfigure.tracing.CompositeTextMapPropagator.lambda$extract$2(CompositeTextMapPropagator.java:110)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.AbstractList$RandomAccessSpliterator.tryAdvance(AbstractList.java:706)
at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:647)
at org.springframework.boot.actuate.autoconfigure.tracing.CompositeTextMapPropagator.extract(CompositeTextMapPropagator.java:112)
at io.micrometer.tracing.otel.bridge.OtelPropagator.extract(OtelPropagator.java:65)
at io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler.onStart(PropagatingReceiverTracingObservationHandler.java:58)
at io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler.onStart(PropagatingReceiverTracingObservationHandler.java:35)
at io.micrometer.observation.ObservationHandler$FirstMatchingCompositeObservationHandler.onStart(ObservationHandler.java:149)
at io.micrometer.observation.SimpleObservation.notifyOnObservationStarted(SimpleObservation.java:222)
at io.micrometer.observation.SimpleObservation.start(SimpleObservation.java:169)
at io.micrometer.observation.Observation.observe(Observation.java:493)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:394)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:469)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:425)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2873)
... 13 more
kafka headers
{
"b3": "d8496f9a548268a7e1c7a0fc40f6789c-2d3821b6c980345a-1",
"spring_json_header_types": "{\"SENDER\":\"java.lang.String\",\"PAYLOAD_TYPE\":\"java.lang.String\"}",
"SENDER": "payment",
"PAYLOAD_TYPE": "PartnerSpaceBillingUserEmailUpdatedEvent"
}
if spring_json_header_types contains information of b3 header, it doesn't produces error. (but in the issue https://github.com/spring-cloud/spring-cloud-stream/issues/2905, it seems natural that spring_json_header_types not contains information of b3 header)
{
"b3": "d8496f9a548268a7e1c7a0fc40f6789c-2d3821b6c980345a-1",
"spring_json_header_types": "{\"SENDER\":\"java.lang.String\",\"PAYLOAD_TYPE\":\"java.lang.String\", \"b3\":\"java.lang.String\"}",
"SENDER": "payment",
"PAYLOAD_TYPE": "PartnerSpaceBillingUserEmailUpdatedEvent"
}
Environment
- io.micrometer:micrometer-registry-prometheus -> 1.10.13
- io.micrometer:micrometer-tracing-bridge-otel -> 1.0.12
- io.micrometer:context-propagation : 1.0.6
- Java version: 17
- spring boot : 3.0.13
- spring cloud version : 2022.0.4
To Reproduce using two spring boot components with observation enabled,
- component A produces kafka messages
- component B consumes kafka messages
- inside component B, it tries to route messages to specific channels with header using KafkaMessageDrivenChannelAdapter
- error occurs
properties related with kafka and micrometer tracing
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: 1
retries: 1
consumer:
enable-auto-commit: true
auto-commit-interval: 1000
max-poll-records: 15
auto-offset-reset: latest
group-id: ***
integration:
management:
observation-patterns: "*"
default-logging-enabled: true
management:
tracing:
sampling:
probability: 1.0
propagation:
type: b3
consume: b3
produce: b3
I have the same problem.
This has nothing to do with Micrometer by itself.
@jonatan-ivanov ,
please, consider to close this in favor of: https://github.com/spring-projects/spring-integration/issues/9191.
Thanks
Closing per https://github.com/micrometer-metrics/micrometer/issues/5108#issuecomment-2135913066