spring-cloud-sleuth
spring-cloud-sleuth copied to clipboard
Spring Cloud 2020.0.4 breaks Spring Cloud Stream Kafka AVRO Message Conversion when Sleuth is on the classpath
Describe the bug When Upgrading from Spring Cloud 2020.0.3 to 2020.0.4 AVRO message conversion is ignored in Spring Cloud Stream Kafka (Kafka 2.7) giving the following error:
2021-10-28 15:43:47.868[0;39m [31mERROR [spring-cloud-webflux-avor-source-only,14b2b7aa4a15f2e6,14b2b7aa4a15f2e6][0;39m [35m28075[0;39m [2m---[0;39m [2m[ctor-http-nio-2][0;39m [36ma.w.r.e.AbstractErrorWebExceptionHandler[0;39m [2m:[0;39m [25d7b7f7-1] 500 Server Error for HTTP GET "/dave"
org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map: {"type":"record","name":"AvroFxRateEvent","namespace":"uk.co.dave.consumer.fxrate.consumer.avro","fields":[{"name":"from","type":{"type":"string","avro.java.string":"String"}},{"name":"to","type":{"type":"string","avro.java.string":"String"}},{"name":"rate","type":{"type":"bytes","logicalType":"decimal","precision":7,"scale":6}}]} (through reference chain: uk.co.dave.consumer.fxrate.consumer.avro.AvroFxRateEvent["schema"]->org.apache.avro.Schema$RecordSchema["valueType"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: Not a map: {"type":"record","name":"AvroFxRateEvent","namespace":"uk.co.dave.consumer.fxrate.consumer.avro","fields":[{"name":"from","type":{"type":"string","avro.java.string":"String"}},{"name":"to","type":{"type":"string","avro.java.string":"String"}},{"name":"rate","type":{"type":"bytes","logicalType":"decimal","precision":7,"scale":6}}]} (through reference chain: uk.co.dave.consumer.fxrate.consumer.avro.AvroFxRateEvent["schema"]->org.apache.avro.Schema$RecordSchema["valueType"])
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertToInternal(MappingJackson2MessageConverter.java:273) ~[spring-messaging-5.3.9.jar:5.3.9]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Handler example.DaveController#dave() [DispatcherHandler]
|_ checkpoint ⇢ org.springframework.cloud.sleuth.instrument.web.TraceWebFilter [DefaultWebFilterChain]
|_ checkpoint ⇢ HTTP GET "/dave" [ExceptionHandlingWebHandler]
Stack trace:
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertToInternal(MappingJackson2MessageConverter.java:273) ~[spring-messaging-5.3.9.jar:5.3.9]
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertToInternal(ApplicationJsonMessageMarshallingConverter.java:69) ~[spring-cloud-stream-3.1.4.jar:3.1.4]
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:201) ~[spring-messaging-5.3.9.jar:5.3.9]
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:191) ~[spring-messaging-5.3.9.jar:5.3.9]
at org.springframework.cloud.function.context.config.SmartCompositeMessageConverter.toMessage(SmartCompositeMessageConverter.java:96) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputMessageIfNecessary(SimpleFunctionRegistry.java:1245) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputIfNecessary(SimpleFunctionRegistry.java:1056) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:509) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.doApply(TraceFunctionAroundWrapper.java:125) ~[spring-cloud-sleuth-instrumentation-3.0.4.jar:3.0.4]
at org.springframework.cloud.function.context.catalog.FunctionAroundWrapper.apply(FunctionAroundWrapper.java:41) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$1.doApply(SimpleFunctionRegistry.java:257) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:506) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:214) ~[spring-cloud-stream-3.1.4.jar:3.1.4]
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:156) ~[spring-cloud-stream-3.1.4.jar:3.1.4]
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:136) ~[spring-cloud-stream-3.1.4.jar:3.1.4]
Which looks like the avro message converters are being ignored for the Jackson ones.
To solve the error
- downgrade Spring Cloud Stream to 3.1.3
- removing Spring Cloud Sleuth as a dependency
- downgrade to Spring Cloud 2020.0.3
Sample https://github.com/davidmelia/spring-boot-webflux-avro-source-only
If you compile and run this project (Assumes a local confluent schema registry on http://localhost:8081 and kafka on localhost:9092) and hit http://localhost:8080/dave this sends a message to kafka. You will see the above error.
Downgraded to spring cloud stream 3.1.3 (uncomment spring-cloud-stream-dependencies in the pom.xml) OR removing Spring Cloud Sleuth in the pom.xml OR downgrading to Spring Cloud 2020.0.3 fixes this problem.
I previously raised on Spring Cloud Sleuth but after realising that downgrading spring cloud stream fixes the problem I am not sure where the problem lies (https://github.com/spring-cloud/spring-cloud-sleuth/issues/2048)
Thanks
if you set the property spring.sleuth.function.enabled
to false
, then it disconnects sleuth from the app. Could you try that and see if that fixes the issue? If that works, then we can delegate the issue to spring-cloud-sleuth.
@sobychacko - I didn't think of doing that - I can confirm spring.sleuth.function.enabled=false fixes the issue.
@davidmelia I'll talk to @marcingrzejszczak tomorrow. I know there was a lot of work in TraceFunctionAroundWrapper
in sleuth and will be available with RC tomorrow, but given that all that is cutting edge that flag is specifically for that purpose. Doesn't mean that there is no issue, just means we still need to dig in and see what's going on, but for now you have simply disabled that interceptor and helped identify that there is indeed an issue in sleuth, so thank you!
Just tapped into the same issue here. The problem is that because the Function is wrapped for that brave/sleuth tracing advice the following sets the FunctionInvocationWrapper.skipInputConversion
property on the sleuth wrapper:
https://github.com/spring-cloud/spring-cloud-stream/blob/c22651a3b0dfdae4313a4772a23f5a8cde653c60/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java#L522
The decision in FunctionInvocationWrapper.convertInputMessageIfNecessary
is made based on the original function, which has skipInputConversion=false by default. Thats why it tries to convert an already perfectly deserialized object, which fails.
https://github.com/spring-cloud/spring-cloud-function/blob/7921842943e782bf82cdcdf21627110548980537/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java#L1049
IMHO the properties of the Wrapper/Decorator should be delegated through the original instance and not copied.
When can we expect this to get fixed?
You can disable sleuth with spring.sleuth.function.enabled
property.
If you intention is to use sleuth, then I need a small reproducible sample. Those flags are there for a reason, since the intention of this particular wrapper is to only read/add additional headers to the message and not to process anything
Hi @olegz
I am experiencing exactly the same issue.
My case is a little different. We have a function that takes in a object deserialized from json, and its output object needs to be serialized to xml
We also want to use spring cloud sleuth with our functions. Especially because we use the traceId from the propagated b3headers in our functions.
But because the FunctionAroundWrapper sets - targetFunction.setSkipOutputConversion(true) , it never uses our XMLConverter and the function always returns json.
hi we are experiencing the same issue with spring boot 2.5.12, spring cloud 2020.0.5 we use avro messages
the problem is that it happens randomly, not for all messages. it don't see in this thread if it was fixed in later versions, and I don't want to disable sleuth since we heavily dependant on it
@marcingrzejszczak please suggest a version of spring boot and cloud that it works, or a workaround
I am going to be closing this issue since
- Sleuth is on its way out in favor of observability
- I just pushed the minor fix to functions to ensure that sleuth is not applied for reactive functions as it really doesn't make mush sense
- Observability is already in place for 4.x branch
- https://github.com/spring-cloud/spring-cloud-function/commit/62ffbc12b614032fea650c995aaad2bc0c2fc3c9
@olegz the problem is not with reactive, it is with spring cloud function
Can you have an example that reproduces it so I can have a look? The original example with the stack trace included at the beginning of this issue is reactive
it keeps happening in production randomly and we weren't able to reproduce it in the local environment. but we do have the message and the stacktrace from the DLQ.
the strange thing is that when we retried the same message from the DLQ it worker with no error
part of the stacktrace :
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:397) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:83) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:454) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:428) at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:123) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255) at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:117) at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:41) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2334) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2315) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2237) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2150) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2032) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1705) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1276) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1268) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1163) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not an array:
and sometimes this stacktrace
error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@7943bd60]; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Infinite recursion (StackOverflowError) (through reference chain: java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]-
I would not call it random or similar problem to the original post. It's clear you have some recursion in your JSON and that explains the randomness since it happens per-message. Consider extracting one of those messages from DLQ and evaluating its JSON payload and se what is the issue.
again, once we retry the same message it works, I would expect it shouldn't work . once we turned off spring sleuth everything works fine - spring.sleuth.enabled=false
There is no such thing as bug that can not be reproduced. It simply means there is no bug
I have no sample message, no meaningful stack trace, not approximate instruction on how to even attempt to reproduce it. What would you like us to do?