micrometer icon indicating copy to clipboard operation
micrometer copied to clipboard

When using KafkaMessageDrivenChannelAdapter with tracing it fails

Open hyunbeeds opened this issue 1 year ago • 2 comments

Describe the bug

when using kafka and KafkaMessageDrivenChannelAdapter with tracing propagation, it fails.. because in micrometer it tries to translate b3 header to String.class..

stacktrace

 org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2946)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2887)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2854)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$57(KafkaMessageListenerContainer.java:2772)
	at io.micrometer.observation.Observation.observe(Observation.java:562)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2770)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2622)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2508)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2150)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1505)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1469)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1344)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalArgumentException: Incorrect type specified for header 'b3'. Expected [class java.lang.String] but actual type is [class [B]
	at org.springframework.messaging.MessageHeaders.get(MessageHeaders.java:216)
	at org.springframework.integration.support.management.observation.MessageReceiverContext.lambda$new$0(MessageReceiverContext.java:38)
	at io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler.lambda$onStart$0(PropagatingReceiverTracingObservationHandler.java:59)
	at io.micrometer.tracing.otel.bridge.OtelPropagator$1.get(OtelPropagator.java:73)
	at io.opentelemetry.extension.trace.propagation.B3PropagatorExtractorSingleHeader.extractSpanContextFromSingleHeader(B3PropagatorExtractorSingleHeader.java:36)
	at io.opentelemetry.extension.trace.propagation.B3PropagatorExtractorSingleHeader.extract(B3PropagatorExtractorSingleHeader.java:31)
	at io.opentelemetry.extension.trace.propagation.B3Propagator.lambda$extract$0(B3Propagator.java:117)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
	at java.base/java.util.Spliterators$ArraySpliterator.tryAdvance(Spliterators.java:1002)
	at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
	at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:647)
	at io.opentelemetry.extension.trace.propagation.B3Propagator.extract(B3Propagator.java:123)
	at org.springframework.boot.actuate.autoconfigure.tracing.CompositeTextMapPropagator.lambda$extract$2(CompositeTextMapPropagator.java:110)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
	at java.base/java.util.AbstractList$RandomAccessSpliterator.tryAdvance(AbstractList.java:706)
	at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
	at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:647)
	at org.springframework.boot.actuate.autoconfigure.tracing.CompositeTextMapPropagator.extract(CompositeTextMapPropagator.java:112)
	at io.micrometer.tracing.otel.bridge.OtelPropagator.extract(OtelPropagator.java:65)
	at io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler.onStart(PropagatingReceiverTracingObservationHandler.java:58)
	at io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler.onStart(PropagatingReceiverTracingObservationHandler.java:35)
	at io.micrometer.observation.ObservationHandler$FirstMatchingCompositeObservationHandler.onStart(ObservationHandler.java:149)
	at io.micrometer.observation.SimpleObservation.notifyOnObservationStarted(SimpleObservation.java:222)
	at io.micrometer.observation.SimpleObservation.start(SimpleObservation.java:169)
	at io.micrometer.observation.Observation.observe(Observation.java:493)
	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:394)
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:469)
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:425)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2873)
	... 13 more

kafka headers

{
	"b3": "d8496f9a548268a7e1c7a0fc40f6789c-2d3821b6c980345a-1",
	"spring_json_header_types": "{\"SENDER\":\"java.lang.String\",\"PAYLOAD_TYPE\":\"java.lang.String\"}",
	"SENDER": "payment",
	"PAYLOAD_TYPE": "PartnerSpaceBillingUserEmailUpdatedEvent"
}

if spring_json_header_types contains information of b3 header, it doesn't produces error. (but in the issue https://github.com/spring-cloud/spring-cloud-stream/issues/2905, it seems natural that spring_json_header_types not contains information of b3 header)

{
"b3": "d8496f9a548268a7e1c7a0fc40f6789c-2d3821b6c980345a-1",
"spring_json_header_types": "{\"SENDER\":\"java.lang.String\",\"PAYLOAD_TYPE\":\"java.lang.String\", \"b3\":\"java.lang.String\"}",
"SENDER": "payment",
"PAYLOAD_TYPE": "PartnerSpaceBillingUserEmailUpdatedEvent"
}

Environment

  • io.micrometer:micrometer-registry-prometheus -> 1.10.13
  • io.micrometer:micrometer-tracing-bridge-otel -> 1.0.12
  • io.micrometer:context-propagation : 1.0.6
  • Java version: 17
  • spring boot : 3.0.13
  • spring cloud version : 2022.0.4

To Reproduce using two spring boot components with observation enabled,

  1. component A produces kafka messages
  2. component B consumes kafka messages
  3. inside component B, it tries to route messages to specific channels with header using KafkaMessageDrivenChannelAdapter
  4. error occurs

properties related with kafka and micrometer tracing

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: 1
      retries: 1
    consumer:
      enable-auto-commit: true
      auto-commit-interval: 1000
      max-poll-records: 15
      auto-offset-reset: latest
      group-id: ***
  integration:
    management:
      observation-patterns: "*"
      default-logging-enabled: true

management:
  tracing:
    sampling:
      probability: 1.0
    propagation:
      type: b3
      consume: b3
      produce: b3

hyunbeeds avatar May 17 '24 03:05 hyunbeeds

I have the same problem.

maslailleron avatar May 28 '24 17:05 maslailleron

This has nothing to do with Micrometer by itself.

@jonatan-ivanov ,

please, consider to close this in favor of: https://github.com/spring-projects/spring-integration/issues/9191.

Thanks

artembilan avatar May 28 '24 18:05 artembilan

Closing per https://github.com/micrometer-metrics/micrometer/issues/5108#issuecomment-2135913066

shakuzen avatar Sep 03 '24 06:09 shakuzen