azure-sdk-for-java icon indicating copy to clipboard operation
azure-sdk-for-java copied to clipboard

[QUESTION] How to configure @ServiceBusListener ServiceBusReceiveMode among Spring Message

Open kensinzl opened this issue 1 year ago • 11 comments

Query/Question

  • I am using Spring Message @ServiceBusListener to receive the Azure Service Bus Queue message.
  • I refer to this official page https://learn.microsoft.com/en-us/azure/developer/java/spring-framework/using-service-bus-in-spring-applications
  • I know the difference between PEEK_LOCK and RECEIVE_AND_DELETE of the ServiceBusReceiveMode, but
    1. I did not find any attribute of @ServiceBusListener to configure the above receive mode?
    2. Meanwhile, even I have tried to configure spring.cloud.azure.servicebus.consumer.receive-mode=RECEIVE_AND_DELETE or spring.cloud.azure.servicebus.processor.receive-mode=RECEIVE_AND_DELETE at the application.properties file, the result seems still PEEK_LOCK, that means the message does not be deleted upon receiving

my question is how to configure the ServiceBusReceiveMode for the Java springboot if you use the annotation, eg: @ServiceBusListener

Setup (please complete the following information if applicable):

  • Java Spring Boot
        <dependency>
            <groupId>com.azure.spring</groupId>
            <artifactId>spring-messaging-azure-servicebus</artifactId>
            <version>5.9.1</version>
        </dependency>
### Tasks

kensinzl avatar Feb 11 '24 02:02 kensinzl

@yiliuTo could you take a look?

/cc @Azure/azsdk-sb-java

joshfree avatar Feb 12 '24 18:02 joshfree

Hi @kensinzl Thank you for reporting this issue. We have received your submission and will take a look. We appreciate your input and will review this matter as soon as possible. Please feel free to provide any additional information or context that you think may be helpful. We'll keep you updated on the progress of our review.

Netyyyy avatar Feb 19 '24 07:02 Netyyyy

Hey, is there any progress, seems no one take this issue to have a look

kensinzl avatar May 01 '24 21:05 kensinzl

@yiliuTo @Netyyyy @joshfree any progress for this ticket? thanks

kensinzl avatar May 01 '24 23:05 kensinzl

I'm stuck with a similar issue where I want to configure the @ServiceBusListener to use PEEK_LOOK and to enable sessions on the consumer as the subscription has sessions enabled. Properties like spring.cloud.azure.servicebus.processor.session-enabled=true spring.cloud.azure.servicebus.processor.receive-mode=peek_lock do not seem to work with the annotated listeners.

nikoladjuran avatar May 21 '24 15:05 nikoladjuran

@kensinzl FYI: Managed to figure it out... Those properties don't configure the @ServiceBusListener but seem to be for other approaches like ServiceBusClient. I've managed to configure the listeners with the following bean declaration.

@Bean
public PropertiesSupplier<ConsumerIdentifier, ProcessorProperties> propertiesSupplier() {
    return key -> {
        var processorProperties = new ProcessorProperties();
        processorProperties.setEntityType(ServiceBusEntityType.TOPIC);
        processorProperties.setReceiveMode(ServiceBusReceiveMode.PEEK_LOCK);
        processorProperties.setSessionEnabled(Boolean.TRUE);

        return processorProperties;
    };
}

nikoladjuran avatar May 22 '24 07:05 nikoladjuran

@nikoladjuran Hey man, thanks for the information.

I still use the @ServiceBusListener(Spring messaging) to monitor the service bus queue. If my understanding is correct, the @ServiceBusListener source code is using the default ServiceBusReceiveMode.PEEK_LOCK and also embed the commit to delete the message if successfully executed.

again, the document of azure is quite a mess, particularly the Java Azure SDK. Hopefully, I am on the right track.

kensinzl avatar May 22 '24 09:05 kensinzl

Just will jump into old discussion and provide some more code that can be useful for developers. Normally I do not recommend do something like this and try to keep everything as simple as possible, but sometimes you can have very specific requirements. You can not only configure connection with PropertiesSupplier, but as well use connection indirectly via ServiceBusMessageHeaders.RECEIVED_MESSAGE_CONTEXT header for manual send message delivery acknowledge. For example:

@Component
public class ServiceBusPropertiesSupplier implements PropertiesSupplier<ConsumerIdentifier, ProcessorProperties> {

    @Value("${app.messaging.not-auto-complete-queue-name}")
    private String notAutoCompleteQueueName;

    @Override
    public ProcessorProperties getProperties(final ConsumerIdentifier key) {
        final ProcessorProperties processorProperties = new ProcessorProperties();
        if (Objects.equals(key.getDestination(), notAutoCompleteQueueName)) {
            processorProperties.setAutoComplete(Boolean.FALSE);
        }
        return processorProperties;
    }

}

and you message consumer can do something like this:

@ServiceBusListener(destination = QUEUE_NAME_SPEL)
public void handleMessage(
        ServiceBusReceivedMessage message,
        @Header(ServiceBusMessageHeaders.RECEIVED_MESSAGE_CONTEXT) ServiceBusReceivedMessageContext context
) {
     if (messageWrongFormat(message)) {
        context.deadLetter();
    } else if (messageHandleSuccessfully(message)) {
        context.complete();
    } else {
         // We do not send anything, so Service Bus will wait until 'Message lock duration' time passed
         // And then deliver message to some consumer
         // or
         // call context.abandon(); so Service Bus immediate re-delivery message to some consumer
     }
}

vitalii-fedoryshyn avatar Aug 16 '24 15:08 vitalii-fedoryshyn

double confirm, any progress for this ticket?? @yiliuTo @Netyyyy @joshfree, thanks so much

kensinzl avatar Aug 19 '24 22:08 kensinzl

@kensinzl FYI: Managed to figure it out... Those properties don't configure the @ServiceBusListener but seem to be for other approaches like ServiceBusClient. I've managed to configure the listeners with the following bean declaration.

@Bean
public PropertiesSupplier<ConsumerIdentifier, ProcessorProperties> propertiesSupplier() {
    return key -> {
        var processorProperties = new ProcessorProperties();
        processorProperties.setEntityType(ServiceBusEntityType.TOPIC);
        processorProperties.setReceiveMode(ServiceBusReceiveMode.PEEK_LOCK);
        processorProperties.setSessionEnabled(Boolean.TRUE);

        return processorProperties;
    };
}

@kensinzl, have you tried this approach? If you register a PropertiesSupplier bean in your application,

@Bean
public PropertiesSupplier<ConsumerIdentifier, ProcessorProperties> propertiesSupplier() {
    return key -> {
        var processorProperties = new ProcessorProperties();
        processorProperties.setReceiveMode(ServiceBusReceiveMode. RECEIVE_AND_DELETE);

        return processorProperties;
    };
}

I understand the perfect way to let you configure it in the annotation, but could you help check whether this approach could meet your requirement? Or what you expect is to be able to configure it via annoation?

saragluna avatar Aug 21 '24 12:08 saragluna

@saragluna

Thank you for the reply.

I make it work now. Let me type the following so that other people can refer if they also face the same issue.

https://learn.microsoft.com/en-us/azure/developer/java/spring-framework/using-service-bus-in-spring-applications if you used Spring Messaging Azure Service Bus annotation way from the above official link, whose default mode is the PeekLock which is hardcode at the ServiceBusReceiverClientBuilder

I tried the properties configuration, eg: spring.cloud.azure.servicebus.consumer.receive-mode=RECEIVE_AND_DELETE or spring.cloud.azure.servicebus.processor.receive-mode=RECEIVE_AND_DELETE, But both of them DO NOT change the mode into RECEIVE_AND_DELETE from the log monitor.

if you want to use the RECEIVE_AND_DELETE mode for the Spring Messaging Azure Service Bus annotation way, you can inject a Bean like the following code. Note: you need to make the auto-complete into false.

    @Bean
    public PropertiesSupplier<ConsumerIdentifier, ProcessorProperties> propertiesSupplier() {
        return key -> {
            ProcessorProperties processorProperties = new ProcessorProperties();
            processorProperties.setReceiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE);
            processorProperties.setAutoComplete(false);
            return processorProperties;
        };
    }

this will be injected into AzureServiceBusMessagingAutoConfiguration.

hopefully, I am at the right track

kensinzl avatar Aug 22 '24 02:08 kensinzl