azure-sdk-for-java icon indicating copy to clipboard operation
azure-sdk-for-java copied to clipboard

[BUG] "This method can be called if AMQP Message body type is 'VALUE'. The actual type is [DATA]" error is logged.

Open Vladcorjuc opened this issue 1 year ago • 3 comments

Describe the bug An error is logged when sending a message to a topic when message is ServiceBusMessage of string. The error logged is "This method can be called if AMQP Message body type is 'VALUE'. The actual type is [DATA]." The message seems to be sent even if the error is logged.

Exception or Stack Trace Stack-Trace available in debug

java.lang.IllegalArgumentException: This method can be called if AMQP Message body type is 'VALUE'. The actual type is [DATA].
	at com.azure.core.amqp.models.AmqpMessageBody.getValue(AmqpMessageBody.java:346)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:688)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:183)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:732)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:183)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:732)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:183)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:502)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:341)
	at com.fasterxml.jackson.databind.ObjectMapper._writeValueAndClose(ObjectMapper.java:4799)
	at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:4065)
	at org.springframework.cloud.function.json.JacksonMapper.toJson(JacksonMapper.java:75)
	at org.springframework.cloud.function.context.config.JsonMessageConverter.convertToInternal(JsonMessageConverter.java:131)
	at org.springframework.cloud.stream.converter.CompositeMessageConverterFactory$2.convertToInternal(CompositeMessageConverterFactory.java:117)
	at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:201)
	at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:189)
	at org.springframework.cloud.function.context.config.SmartCompositeMessageConverter.toMessage(SmartCompositeMessageConverter.java:158)
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputMessageIfNecessary(SimpleFunctionRegistry.java:1451)
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputIfNecessary(SimpleFunctionRegistry.java:1263)
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.lambda$convertOutputPublisherIfNecessary$27(SimpleFunctionRegistry.java:1525)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at reactor.core.publisher.SinkManyUnicast.drainRegular(SinkManyUnicast.java:284)
	at reactor.core.publisher.SinkManyUnicast.drain(SinkManyUnicast.java:365)
	at reactor.core.publisher.SinkManyUnicast.tryEmitNext(SinkManyUnicast.java:239)
	at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
	at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)

To Reproduce Utilize the code snippet below to send a message to a topic.

Code Snippet

@Bean
  public Supplier<Flux<Message<ServiceBusMessage>>> publishTopicMessage() {
    return () -> publisher.asFlux()
        .doOnError(t -> log.error("An error occurred while sending a message. Cause: %s.".formatted(t.getLocalizedMessage())));
  }

  public synchronized void publishMessage(final String message, final String messageType) {
    var tenantId = "id";
    var principalId = "id";

    var azureMessage = new ServiceBusMessage(message);
    azureMessage.setCorrelationId(MDC.get(MDC_CORRELATION_ID_KEY));
    azureMessage.getApplicationProperties().put(EVENT_HEADER, messageType);
    azureMessage.getApplicationProperties().put(TENANT_ID_HEADER, tenantId);
    azureMessage.getApplicationProperties().put(PRINCIPAL_ID_HEADER, principalId);
    
    publisher.emitNext(
        MessageBuilder.withPayload(azureMessage).build(),
        Sinks.EmitFailureHandler.FAIL_FAST
    );
  }

Expected behavior No error should be logged if the message was sent with success.

Screenshots If applicable, add screenshots to help explain your problem.

Setup (please complete the following information):

  • OS: Windows/Linux
  • IDE: IntelliJ
  • Library/Libraries: spring-cloud-azure-stream-binder-servicebus:5.15.0
  • Java version: 17
  • Frameworks: Spring Boot 3.3.3

Additional context It seems the error is logged two times.

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • [x] Bug Description Added
  • [x] Repro Steps Added
  • [x] Setup information Added

Vladcorjuc avatar Aug 26 '24 14:08 Vladcorjuc

@anuchandy @conniey @lmolkova

github-actions[bot] avatar Aug 26 '24 14:08 github-actions[bot]

Thank you for your feedback. Tagging and routing to the team member best able to assist.

github-actions[bot] avatar Aug 26 '24 14:08 github-actions[bot]

Hi @Netyyy, could you / spring-team please help take a look? It seems the Spring layer is sending message with default encoding (DATA), but then later it is attempting to access the message content as VALUE, the getValue API should be invoked only if the message was encoded as VALUE

anuchandy avatar Aug 26 '24 17:08 anuchandy

Hey @anuchandy and @Netyyyy . Do you have any update on this?

We have a similar issue in our project.

Somehow the retryer sends the message as DATA in binary format (default retry implementation) and our application is able to read VALUE in json format. So here is a mismatch and we receive the same error message.

Is this related to Spring Cloud Stream or the extension here?

Maybe to add another issue. Even when setting the retry options to 0 retries it somehow comes into this situation.

bugra-derre avatar Jun 23 '25 08:06 bugra-derre