[BUG]Service Bus Processor internally prefetch 2 more messages when maxConcurrentCalls>1
Describe the bug
Below is the log when maxConcurrentCalls= 2. After we received 2 messages for each process thread, there are extra two Tranfer frame, which means we prefetch 2 messages from SB. So, current behavior is like add prefetchCount(1).

Since the message is prefetched, it increase the posiblity that we complete an expired message. And when the processor tries to abandon the message, the order of the messages will be incorrect.

Code Snippet
Consumer<ServiceBusReceivedMessageContext> processMessage = messageContext -> {
ServiceBusReceivedMessage message = messageContext.getMessage();
try {
System.out.println("Start process message. Sequence#: " + message.getSequenceNumber());
TimeUnit.SECONDS.sleep(10);
messageContext.complete();
System.out.println("Finished process message. Sequence#: " + message.getSequenceNumber());
} catch (Exception e) {
messageContext.abandon();
}
};
Consumer<ServiceBusErrorContext> processError = errorContext -> {
System.err.println("Error occurred while receiving message: " + errorContext.getException());
};
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.connectionString(Credentials.serviceBusConnectionString)
.processor()
.queueName(Credentials.serviceBusQueue)
.processMessage(processMessage)
.processError(processError)
.maxConcurrentCalls(2)
.disableAutoComplete()
.buildProcessorClient();
processorClient.start();
}
Enable AMQP frame by setting the environment variable: PN_TRACE_FRM=1
Expected behavior We should request message after previous message was processed completed.
Reason This issue is similar to the session processor ordering issue and the fix is https://github.com/Azure/azure-sdk-for-java/pull/29696
It may because of reactor internally prefetch when we use serveral publishOn or runOn, which needs use to look into details.
Any idea when this bug will be fixed?Until such time, what is the recommended config for prefetch , when concurrency is >1
Reason
- The
.parallel(processorOptions.getMaxConcurrentCalls(), 1)will always prefetch 1 more message. - When subscriber of
ServiceBusReceiveLinkProcessorcallonSubscribe(), it will request 1 more message. AndServiceBusReceiveLinkProcessor#onNext()add the link credit.
This issue needs to be looked into after the credit calculation work is merged - #30930
This undesired prefetch is addressed in the PR https://github.com/Azure/azure-sdk-for-java/pull/34854