azure-service-bus-java icon indicating copy to clipboard operation
azure-service-bus-java copied to clipboard

Infinite Receiver/Subscriber Issue | Java 11 | Spring Boot 2.5.*

Open arorashivam opened this issue 3 years ago • 2 comments

Attempt 1 : Actual : The receiver/subscriber is getting disposed/closed off after some duration, like : 24hours !! We kept it in the start mode with an expectation that it would never be disposed. Please refer the following code :

ServiceBusProcessorClient serviceBusProcessorClient =
    sbConfiguration.getReceiverProcessorClient(topicName, subscriptionName, onMessage, onError);
serviceBusProcessorClient.start();

Expectation : The receiver/subscriber should remain alive for the entire application's lifecycle for the obvious reason as we want to keep listening to the queue for any upcoming messages.

Attempt 2 : Since it was getting disposed, we tried putting a while loop to keep opening the connections once they are closed, with a delay of 5-10minutes. Please find the following code for reference :

    public void startSessionReceiving(
            String queueName,
            Consumer<ServiceBusReceivedMessageContext> onMessage,
            Consumer<ServiceBusErrorContext> onError) throws InterruptedException {

        ServiceBusProcessorClient sessionProcessor =
                sbConfiguration.getSessionProcessorClient(queueName, onMessage, onError);

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(() -> {
            log.info("Executor thread calling to start listener");
            sessionProcessor.start();
            while(true){
                log.info("Checking the status of service bus connection");
                if(!sessionProcessor.isRunning()){
                    log.info("Another processor client started since previous connection is closed");
                    sessionProcessor.start();
                }
                try {
                    TimeUnit.MINUTES.sleep(new RandomDataGenerator().nextLong(5, 10));
                } catch (InterruptedException ex) {
                    log.error("Exception occurred in sleep thread for service bus.", ex);
                }
            }
        });
    }

The issue with this code is : we still have a delay of some minutes before checking the connection status, which could further be reduced but the problem is do we actually need to do all this or we are missing something big here? Ideally the consumers are meant to always listen (infinitely) for any new messages !!

It would be much appreciated if someone could recommend the better way/approach of achieving the infinite (application's scope) listener without managing the lifecycle of it ourselves.

arorashivam avatar May 04 '22 06:05 arorashivam

public ServiceBusProcessorClient getSessionProcessorClient(String topicName, String subscriptionName, Consumer<ServiceBusReceivedMessageContext> onMessage, Consumer<ServiceBusErrorContext> onError) {
        String connectionString = sbProperties.getConnectionString();
        return new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .sessionProcessor()
                .disableAutoComplete()
                .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
                .topicName(topicName)
                .subscriptionName(subscriptionName)
                .maxConcurrentSessions(5)
                .processMessage(onMessage)
                .processError(onError)
                .buildProcessorClient();
    }

srikarasr avatar Jun 20 '22 05:06 srikarasr

SDK Version

                <dependency>
			<groupId>com.azure</groupId>
			<artifactId>azure-core</artifactId>
			<version>1.16.0</version>
		</dependency>
		<dependency>
			<groupId>com.azure</groupId>
			<artifactId>azure-messaging-servicebus</artifactId>
			<version>7.2.1</version>
		</dependency>

srikarasr avatar Jun 20 '22 05:06 srikarasr