azure-sdk-for-java icon indicating copy to clipboard operation
azure-sdk-for-java copied to clipboard

[BUG]Service Bus Processor internally prefetch 2 more messages when maxConcurrentCalls>1

Open liukun-msft opened this issue 3 years ago • 2 comments

Describe the bug

Below is the log when maxConcurrentCalls= 2. After we received 2 messages for each process thread, there are extra two Tranfer frame, which means we prefetch 2 messages from SB. So, current behavior is like add prefetchCount(1).

image

Since the message is prefetched, it increase the posiblity that we complete an expired message. And when the processor tries to abandon the message, the order of the messages will be incorrect.

image

Code Snippet

        Consumer<ServiceBusReceivedMessageContext> processMessage = messageContext -> {
            ServiceBusReceivedMessage message = messageContext.getMessage();
            try {
                System.out.println("Start process message. Sequence#: " + message.getSequenceNumber());

                TimeUnit.SECONDS.sleep(10);
                messageContext.complete();

                System.out.println("Finished process message. Sequence#: " + message.getSequenceNumber());
                
            } catch (Exception e) {
                messageContext.abandon();
            }
        };

        Consumer<ServiceBusErrorContext> processError = errorContext -> {
            System.err.println("Error occurred while receiving message: " + errorContext.getException());
        };

        ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
                .connectionString(Credentials.serviceBusConnectionString)
                .processor()
                .queueName(Credentials.serviceBusQueue)
                .processMessage(processMessage)
                .processError(processError)
                .maxConcurrentCalls(2)
                .disableAutoComplete()
                .buildProcessorClient();

        processorClient.start();
    }

Enable AMQP frame by setting the environment variable: PN_TRACE_FRM=1

Expected behavior We should request message after previous message was processed completed.

Reason This issue is similar to the session processor ordering issue and the fix is https://github.com/Azure/azure-sdk-for-java/pull/29696

It may because of reactor internally prefetch when we use serveral publishOn or runOn, which needs use to look into details.

liukun-msft avatar Aug 18 '22 07:08 liukun-msft

Any idea when this bug will be fixed?Until such time, what is the recommended config for prefetch , when concurrency is >1

meenarc avatar Sep 07 '22 16:09 meenarc

Reason

  1. The .parallel(processorOptions.getMaxConcurrentCalls(), 1) will always prefetch 1 more message.
  2. When subscriber of ServiceBusReceiveLinkProcessor call onSubscribe(), it will request 1 more message. And ServiceBusReceiveLinkProcessor#onNext() add the link credit.

liukun-msft avatar Sep 19 '22 08:09 liukun-msft

This issue needs to be looked into after the credit calculation work is merged - #30930

ki1729 avatar Feb 28 '23 18:02 ki1729

This undesired prefetch is addressed in the PR https://github.com/Azure/azure-sdk-for-java/pull/34854

anuchandy avatar Jun 07 '23 19:06 anuchandy