spring-cloud-sleuth icon indicating copy to clipboard operation
spring-cloud-sleuth copied to clipboard

Spring Cloud 2020.0.4 breaks Spring Cloud Stream Kafka AVRO Message Conversion when Sleuth is on the classpath

Open davidmelia opened this issue 2 years ago • 6 comments

Describe the bug When Upgrading from Spring Cloud 2020.0.3 to 2020.0.4 AVRO message conversion is ignored in Spring Cloud Stream Kafka (Kafka 2.7) giving the following error:

2021-10-28 15:43:47.868[0;39m [31mERROR [spring-cloud-webflux-avor-source-only,14b2b7aa4a15f2e6,14b2b7aa4a15f2e6][0;39m [35m28075[0;39m [2m---[0;39m [2m[ctor-http-nio-2][0;39m [36ma.w.r.e.AbstractErrorWebExceptionHandler[0;39m [2m:[0;39m [25d7b7f7-1]  500 Server Error for HTTP GET "/dave"

org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map: {"type":"record","name":"AvroFxRateEvent","namespace":"uk.co.dave.consumer.fxrate.consumer.avro","fields":[{"name":"from","type":{"type":"string","avro.java.string":"String"}},{"name":"to","type":{"type":"string","avro.java.string":"String"}},{"name":"rate","type":{"type":"bytes","logicalType":"decimal","precision":7,"scale":6}}]} (through reference chain: uk.co.dave.consumer.fxrate.consumer.avro.AvroFxRateEvent["schema"]->org.apache.avro.Schema$RecordSchema["valueType"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: Not a map: {"type":"record","name":"AvroFxRateEvent","namespace":"uk.co.dave.consumer.fxrate.consumer.avro","fields":[{"name":"from","type":{"type":"string","avro.java.string":"String"}},{"name":"to","type":{"type":"string","avro.java.string":"String"}},{"name":"rate","type":{"type":"bytes","logicalType":"decimal","precision":7,"scale":6}}]} (through reference chain: uk.co.dave.consumer.fxrate.consumer.avro.AvroFxRateEvent["schema"]->org.apache.avro.Schema$RecordSchema["valueType"])
	at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertToInternal(MappingJackson2MessageConverter.java:273) ~[spring-messaging-5.3.9.jar:5.3.9]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	|_ checkpoint ⇢ Handler example.DaveController#dave() [DispatcherHandler]
	|_ checkpoint ⇢ org.springframework.cloud.sleuth.instrument.web.TraceWebFilter [DefaultWebFilterChain]
	|_ checkpoint ⇢ HTTP GET "/dave" [ExceptionHandlingWebHandler]
Stack trace:
		at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertToInternal(MappingJackson2MessageConverter.java:273) ~[spring-messaging-5.3.9.jar:5.3.9]
		at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertToInternal(ApplicationJsonMessageMarshallingConverter.java:69) ~[spring-cloud-stream-3.1.4.jar:3.1.4]
		at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:201) ~[spring-messaging-5.3.9.jar:5.3.9]
		at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:191) ~[spring-messaging-5.3.9.jar:5.3.9]
		at org.springframework.cloud.function.context.config.SmartCompositeMessageConverter.toMessage(SmartCompositeMessageConverter.java:96) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputMessageIfNecessary(SimpleFunctionRegistry.java:1245) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputIfNecessary(SimpleFunctionRegistry.java:1056) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:509) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.doApply(TraceFunctionAroundWrapper.java:125) ~[spring-cloud-sleuth-instrumentation-3.0.4.jar:3.0.4]
		at org.springframework.cloud.function.context.catalog.FunctionAroundWrapper.apply(FunctionAroundWrapper.java:41) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$1.doApply(SimpleFunctionRegistry.java:257) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:506) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:214) ~[spring-cloud-stream-3.1.4.jar:3.1.4]
		at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:156) ~[spring-cloud-stream-3.1.4.jar:3.1.4]
		at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:136) ~[spring-cloud-stream-3.1.4.jar:3.1.4]

Which looks like the avro message converters are being ignored for the Jackson ones.

To solve the error

  1. downgrade Spring Cloud Stream to 3.1.3
  2. removing Spring Cloud Sleuth as a dependency
  3. downgrade to Spring Cloud 2020.0.3

Sample https://github.com/davidmelia/spring-boot-webflux-avro-source-only

If you compile and run this project (Assumes a local confluent schema registry on http://localhost:8081 and kafka on localhost:9092) and hit http://localhost:8080/dave this sends a message to kafka. You will see the above error.

Downgraded to spring cloud stream 3.1.3 (uncomment spring-cloud-stream-dependencies in the pom.xml) OR removing Spring Cloud Sleuth in the pom.xml OR downgrading to Spring Cloud 2020.0.3 fixes this problem.

I previously raised on Spring Cloud Sleuth but after realising that downgrading spring cloud stream fixes the problem I am not sure where the problem lies (https://github.com/spring-cloud/spring-cloud-sleuth/issues/2048)

Thanks

davidmelia avatar Nov 01 '21 10:11 davidmelia

if you set the property spring.sleuth.function.enabled to false, then it disconnects sleuth from the app. Could you try that and see if that fixes the issue? If that works, then we can delegate the issue to spring-cloud-sleuth.

sobychacko avatar Nov 02 '21 14:11 sobychacko

@sobychacko - I didn't think of doing that - I can confirm spring.sleuth.function.enabled=false fixes the issue.

davidmelia avatar Nov 02 '21 15:11 davidmelia

@davidmelia I'll talk to @marcingrzejszczak tomorrow. I know there was a lot of work in TraceFunctionAroundWrapper in sleuth and will be available with RC tomorrow, but given that all that is cutting edge that flag is specifically for that purpose. Doesn't mean that there is no issue, just means we still need to dig in and see what's going on, but for now you have simply disabled that interceptor and helped identify that there is indeed an issue in sleuth, so thank you!

olegz avatar Nov 02 '21 15:11 olegz

Just tapped into the same issue here. The problem is that because the Function is wrapped for that brave/sleuth tracing advice the following sets the FunctionInvocationWrapper.skipInputConversion property on the sleuth wrapper:

https://github.com/spring-cloud/spring-cloud-stream/blob/c22651a3b0dfdae4313a4772a23f5a8cde653c60/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java#L522

The decision in FunctionInvocationWrapper.convertInputMessageIfNecessary is made based on the original function, which has skipInputConversion=false by default. Thats why it tries to convert an already perfectly deserialized object, which fails.

https://github.com/spring-cloud/spring-cloud-function/blob/7921842943e782bf82cdcdf21627110548980537/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java#L1049

IMHO the properties of the Wrapper/Decorator should be delegated through the original instance and not copied.

When can we expect this to get fixed?

MarcoR83 avatar May 12 '22 15:05 MarcoR83

You can disable sleuth with spring.sleuth.function.enabled property. If you intention is to use sleuth, then I need a small reproducible sample. Those flags are there for a reason, since the intention of this particular wrapper is to only read/add additional headers to the message and not to process anything

olegz avatar May 12 '22 16:05 olegz

Hi @olegz

I am experiencing exactly the same issue.

My case is a little different. We have a function that takes in a object deserialized from json, and its output object needs to be serialized to xml

We also want to use spring cloud sleuth with our functions. Especially because we use the traceId from the propagated b3headers in our functions.

But because the FunctionAroundWrapper sets - targetFunction.setSkipOutputConversion(true) , it never uses our XMLConverter and the function always returns json.

Richardmbs12 avatar Jul 12 '22 09:07 Richardmbs12

hi we are experiencing the same issue with spring boot 2.5.12, spring cloud 2020.0.5 we use avro messages

the problem is that it happens randomly, not for all messages. it don't see in this thread if it was fixed in later versions, and I don't want to disable sleuth since we heavily dependant on it

erohana avatar Oct 24 '22 07:10 erohana

@marcingrzejszczak please suggest a version of spring boot and cloud that it works, or a workaround

erohana avatar Oct 27 '22 11:10 erohana

I am going to be closing this issue since

  1. Sleuth is on its way out in favor of observability
  2. I just pushed the minor fix to functions to ensure that sleuth is not applied for reactive functions as it really doesn't make mush sense
  3. Observability is already in place for 4.x branch
  4. https://github.com/spring-cloud/spring-cloud-function/commit/62ffbc12b614032fea650c995aaad2bc0c2fc3c9

olegz avatar Oct 27 '22 13:10 olegz

@olegz the problem is not with reactive, it is with spring cloud function

erohana avatar Oct 27 '22 13:10 erohana

Can you have an example that reproduces it so I can have a look? The original example with the stack trace included at the beginning of this issue is reactive

olegz avatar Oct 27 '22 13:10 olegz

it keeps happening in production randomly and we weren't able to reproduce it in the local environment. but we do have the message and the stacktrace from the DLQ.

the strange thing is that when we retried the same message from the DLQ it worker with no error

part of the stacktrace :


at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:397) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:83) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:454) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:428) at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:123) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255) at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:117) at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:41) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2334) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2315) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2237) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2150) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2032) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1705) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1276) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1268) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1163) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not an array: 

and sometimes this stacktrace


 error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@7943bd60]; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Infinite recursion (StackOverflowError) (through reference chain: java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]-

erohana avatar Oct 27 '22 14:10 erohana

I would not call it random or similar problem to the original post. It's clear you have some recursion in your JSON and that explains the randomness since it happens per-message. Consider extracting one of those messages from DLQ and evaluating its JSON payload and se what is the issue.

olegz avatar Oct 27 '22 14:10 olegz

again, once we retry the same message it works, I would expect it shouldn't work . once we turned off spring sleuth everything works fine - spring.sleuth.enabled=false

erohana avatar Oct 27 '22 17:10 erohana

There is no such thing as bug that can not be reproduced. It simply means there is no bug

olegz avatar Oct 28 '22 14:10 olegz

I have no sample message, no meaningful stack trace, not approximate instruction on how to even attempt to reproduce it. What would you like us to do?

olegz avatar Oct 28 '22 14:10 olegz