azure-sdk-for-java icon indicating copy to clipboard operation
azure-sdk-for-java copied to clipboard

[BUG] ServiceBusReceiverClient client never returning after receiveMessages method call (maxWaitTime is set to 1 sec)

Open dakshme opened this issue 2 years ago • 25 comments

Describe the bug I am using the the latest beta version of SB 7.15.0-beta.4. Using ServiceBusReceiverClient client. create like:

ServiceBusClientBuilder()
                    .connectionString(sbTopicConnUri)
                    .receiver()
                    .topicName(azureConfig.getSbTopicName())
                    .subscriptionName(azureConfig.getSbTopicSubscription())
                    .disableAutoComplete()
                    .prefetchCount(500)
                    .buildClient();

Using this client I am pulling messages from subscription periodically. For doing so I have a thread pool of 5 threads which internally uses the same receiver client object to receive messages from SB subscription.

While doing so I noticed that as long as the messages are available in subscription the client keep returning them. Once no messages are available the these thread stuck while calling receiveMessages(batch, Duration.ofSeconds(1)). Although the wait time is 1 sec still these threads never returns after the call.

Exception or Stack Trace As such I dont see any stack trace or exception during this.

To Reproduce Create multiple threads sharing the same receiver client. All threads should actively try to read messages using same client at the same time. Once the messages are exhaust on subscription, then threads must return without any message.

Code Snippet Add the code snippet that causes the issue. Creating the receiver client (as above)

AzureSBMsgProcessor[] jobs = { new AzureSBMsgProcessor(barrier, this.messageReceiver),
                new **AzureSBMsgProcessor**(barrier, this.messageReceiver),
                new AzureSBMsgProcessor(barrier, this.messageReceiver),
                new AzureSBMsgProcessor(barrier, this.messageReceiver),
                new AzureSBMsgProcessor(barrier, this.messageReceiver)

AzureSBMsgProcessor is a class which pull messages from sb using the receiver client.

while(recordChangeList.size() < batchSize && (maxPullIter-- != 0) && isMsgPresentForDp) {

            List<FutureTask<List<RecordChange>>> futureTaskList  = new ArrayList<>();
            for (AzureSBMsgProcessor job: jobs) {
                futureTaskList.add((FutureTask<List<RecordChange>>) service.submit(job));
            }
            List<RecordChange> tempRecordChangeList = new ArrayList<>();
            try {
                barrier.await();
                futureTaskList.forEach(msgList -> {
                    try {
                        tempRecordChangeList.addAll(msgList.get());
                    } catch (InterruptedException | ExecutionException ex) {
                        log.error("Error occurred while fetching values from future tasks", ex);
                        if (ex instanceof InterruptedException)
                            Thread.currentThread().interrupt();
                    }
                });
            } catch (InterruptedException | BrokenBarrierException e) {
                log.error("Error occurred while waiting & processing future tasks", e);
                if (e instanceof InterruptedException)
                    Thread.currentThread().interrupt();
            }

            if (tempRecordChangeList.isEmpty())
                isMsgPresentForDp = false;
            else
                recordChangeList.addAll(tempRecordChangeList);

            barrier.reset();
        }

Expected behavior It is expected that when no messages are available with the SB subscription then all the thread must immediately return after the maxWaitTime, which is 1 second in my case.

Screenshots If applicable, add screenshots to help explain your problem.

Setup (please complete the following information):

  • OS: [e.g. iOS] ubuntu
  • IDE: [e.g. IntelliJ] InteliJ
  • Library/Libraries: [e.g. com.azure:azure-core:1.16.0 (groupId:artifactId:version)]
 <dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-servicebus</artifactId>
    <version>7.15.0-beta.4</version>
</dependency>
  • Java version: [e.g. 8] java 17
  • App Server/Environment: [e.g. Tomcat, WildFly, Azure Function, Apache Spark, Databricks, IDE plugin or anything special]
  • Frameworks: [e.g. Spring Boot, Micronaut, Quarkus, etc] springboot 2.7

If you suspect a dependency version mismatch (e.g. you see NoClassDefFoundError, NoSuchMethodError or similar), please check out Troubleshoot dependency version conflict article first. If it doesn't provide solution for the problem, please provide:

  • verbose dependency tree (mvn dependency:tree -Dverbose)
  • exception message, full stack trace, and any available logs

Additional context Add any other context about the problem here.

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • [ ] Bug Description Added
  • [ ] Repro Steps Added
  • [ ] Setup information Added

dakshme avatar Nov 09 '23 13:11 dakshme

Hello Anu Thomas,

Thank you for your response. Yes, the steps you mentioned are correct. The batch size I am passing is 100. So basically each of the 5 threads tries to pull max of 100 msg with max wait time 1 sec (I even tried varying wait time to lesser and higher values). Theoretically, with the above case you mentioned, in the first execution cycle 5 threads must read 500 msgs (approx), then next 500 (near about), then if anything remains in subscription and then finally an empty list.

Thanks, Mahesh Daksha

On Fri, Nov 10, 2023 at 12:57 AM Anu Thomas Chandy @.***> wrote:

Hello @dakshme https://github.com/dakshme, thanks for reaching out. I want to clarify the repro steps, is the below steps looks good?

  1. Create a topic and prefill it with 1000 messages.
  2. Create one ServiceBusReceiverClient instance.
  3. Create a Callable instance that uses ServiceBusReceiverClient instance (in step2) to receive a batch of message with timeout as 1 sec (the Callable returns this received batch of messages as list).
  4. Create an ThreadPoolExecutor with 5 Threads.
  5. Submit the Callable instance (in Step3) to the ThreadPoolExecutor 5 times.
  6. Wait for all submission to return messages (i.e., wait via Future.get() for list).
  7. Repeat Step 5 and 6 in a loop, eventually all the 1000 messages prefilled in step1 should be pulled.
  8. Once the topic is empty, we expect Future.get() from Callable to return empty list.

In step8, what you’re observing is, Future.get() do not return because receiveMessages() is stuck once topic is empty?

Could you let me know the value for batch in receiveMessages(batch, Duration.ofSeconds(1))?

— Reply to this email directly, view it on GitHub https://github.com/Azure/azure-sdk-for-java/issues/37594#issuecomment-1804482192, or unsubscribe https://github.com/notifications/unsubscribe-auth/ALR2LFJ2DRVKXQPRSB2GRXLYDUVBJAVCNFSM6AAAAAA7ESYPTGVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTQMBUGQ4DEMJZGI . You are receiving this because you were mentioned.Message ID: @.***>

dakshme avatar Nov 10 '23 04:11 dakshme

Hello @dakshme, I investigated this.

The synchronous receiveMessages(,) API is designed to be invoked "serially". Underneath, each receiveMessages(,) synchronous invocation enqueue a work to pull the message. Each such work is dequeued and processed "serially", i.e. a work-1 pulls messages from broker into list (or timeout), after the "Completion" of work-1 only work-2 will "Start" to pull messages into list. So, on the library level, work runs serially, even if it’s originated from multiple threads. This is a design choice made to guarantee the message ordering.

It looks like by calling from multiple threads, the above serial invocation logic breaks resulting in the problem. I think there are some improvements we can make to terminate gracefully instead of not returning from receive call. Also Javadoc the design.

In your case, to workaround, you want to pull 500 messages in one go, instead of 5 threads each pulling 100 messages (which will be translated to serial message pulling anyway). Then you want to split 500 messages into 5 sub-batches (each with 100), hand over each sub-batch to separate threads for actual business processing (that produces RecordChange) then join the results. Repeat this process in a loop, so synchronous receiveMessages(,) guaranteed to invoked serially but the business processing runs concurrently.

anuchandy avatar Nov 10 '23 15:11 anuchandy

Hello @anuchandy , Thank you for your response, I have studied through the details. Please check the below point and help me with your response.

  1. Fixing the hang issue in concurrent receiver messages call - I have gone thru this PR. So now with this fix I should not find any application threads in hang state (while requesting msgs with a max wait time). Also please confirm which version of SDK this change will be available? I believe, this will at-least solve one part of our reported issue i.e. application threads getting freeze while waiting for messages and never timed out.
  2. Coming to the our scenario or fashion of pulling msgs using receiver client, With our initial implementation we were using single (main) thread for pulling and ack msgs. But we were not happy with the SB performance We noticed that it takes almost 4-5 mins to pull and ack 5000 msgs from subscription (using the this sync client receiver). So we thought to optimize this (using above mentioned implementation). This landed us in current situation. The approach you suggested above may not significantly help the application as the application's msg processing time (after receiving from SB) is just smaller fraction of the overall time of pulling and completing message to SB. Hence we thought to parallelized the pulling and acking of msgs to SB. So, with this receiver client can you confirm the SDK performance number (considering single thread). Performance numbers as in how many msgs pulled and acked with a minute or so.
  3. Also confirm me, what if I create separate receiver client for each thread (unlike current implementation of re-using the same receiver client). So if each concurrent thread using its own receiver client to pull/receive messages will this change the sdk or SB server behavior in anyways or if it will help us in achieving any type optimization or this will not make any impact as such?. Please confirm.
  4. Can you suggest any other way or using this or any other available SB receiver client to optimize our use case. In our case we have a constraint of pulling msgs, process them. Due to other application limitations we do not want to go with push based receiver clients.

Kindly suggest on the above points.

dakshme avatar Nov 14 '23 18:11 dakshme

Hello @dakshme,

I’ve got to prepare and run more tests. Once it passes and proves change can be shipped, I’ll update plan around ship date and version of SDK.

Thanks for sharing the additional context around the reason to try the split approach.

The rate at which messages can be pulled and completed also depends on the tier, the network condition and how close the consumer and broker are. After completing the previously mentioned tests, I’ll try set up an environment to get a general idea.

Expect this to take 1-2 weeks.

I agree with your thoughts about using separate Receiver Client instance per Thread. With this approach you should be able to perform concurrent pulling given message arrival will no longer be serialized via a single channel (Each Receiver Client uses separate channel) and assuming egress does not throttle the service.

I wish we could use the ProcessorClient which has out-of-the-box support for threading but given there is a no-push-model application constrain, we cannot use ProcessorClient. The synchronous Receiver is the only client type for pull-model.

anuchandy avatar Nov 14 '23 22:11 anuchandy

@dakshme, btw, could you let me know the service tier?

anuchandy avatar Nov 14 '23 23:11 anuchandy

@dakshme, btw, could you let me know the service tier?

Pricing tier is Standard for us (at least in the env I am working).

dakshme avatar Nov 15 '23 04:11 dakshme

Hello @dakshme, I was trying to find an official documentation about Standard tier and came across this.

My understanding from the above documentation about Standard tier is –

  1. For Standard tier Service Bus namespace, there is a limit on the number of operations that can be performed on a specific period. This limiting logic follows a credit-based algorithm.
  2. A namespace has 1000 credits available every second. Each operation performed against namespace has a credit cost associated with it.
  3. For example, each time a message is received, one credit is subtracted from the available credits. Once the credits exhaust, subsequent receive attempts will be throttled until the start of the next second.

My understanding here is - let’s say, application received 1000 messages in 50ms since the start of the current second i, consuming all credits, then any receive attempt made in the remaining 50ms of the second i will be throttled. Once that second i elapses, and next second i+1 starts with 1000 credit.

I don’t see the credit cost of complete|abandon in the doc or it’s part of the receive cost. Any other operations (e.g. Send) performed consumes its own credit. Different operations have different credit costs.

anuchandy avatar Nov 15 '23 17:11 anuchandy

@anuchandy

Thank you for point out the documentation. In my case for now I am not seeing any related issues indicating credit limit exhausted. I understand that in standard tier there is limit to the operations one can perform across a namespace. In my case we can hit this limit if constantly we receive loads of messages (which is not the case in my test setup). As you pointed out, so this is confirmed that even if application receives 100 msgs in single receive msg call will still cause reducing the credit by 100 and not by 1 (as application made single read operation to SB). Please assert this understanding of mine.

Also as I discussed with you in the last thread, I tried reading messages using separate sync receiver client for each thread. With my initial testing it looks like even this wont work as there may be some internal limitations within the sdk implementation. Please find the below multiple errors application observed while reading/complete/abandon messages to SB (each thread using its own receiver client): The occurrences of these err msgs were multiple i.e. error for each msg complete()/abandon(). I am just listing single err for your reference.

  • Cannot process the disposition request to set the state as 'Modified{deliveryFailed=null, undeliverableHere=null, messageAnnotations=null}' for the delivery with delivery tag (id) '698c7658-81b8-45cc-a887-ed625554004b'. Reason: The delivery with the delivery tag does not exist in the link's DeliveryMap.
  • Cannot process the disposition request to set the state as 'Rejected{error=Error{condition=com.microsoft:dead-letter, description='null', info={}}}' for the delivery with delivery tag (id) 'd8cad3ff-6fc1-47f3-93c7-251e655e000b'. Reason: The delivery with the delivery tag does not exist in the link's DeliveryMap.
  • The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. For more information please see https://aka.ms/ServiceBusExceptions . Reference:385a80a4-3132-41fc-afcd-e5d9c943bd00, TrackingId:3576e1a3-c040-48ba-8080-55f1f10dac06_B38, SystemTracker:osdu-r3mvp-dp1de-077y-bus:topic:recordstopic~79|wke-listener, Timestamp:2023-11-15T15:54:38, errorContext[NAMESPACE: osdu-r3mvp-dp1de-077y-bus.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: recordstopic/subscriptions/wke-listener/$management, REFERENCE_ID: recordstopic/subscriptions/wke-listener-mgmt:receiver, LINK_CREDIT: 44]
  • Cannot process the disposition request to set the state as 'Accepted{}' for the delivery with delivery tag (id) '698c7658-81b8-45cc-a887-ed625554004b'. Reason: The delivery with the delivery tag does not exist in the link's DeliveryMap., message=Cannot process the disposition request to set the state as 'Accepted{}' for the delivery with delivery tag (id) '698c7658-81b8-45cc-a887-ed625554004b'. Reason: The delivery with the delivery tag does not exist in the link's DeliveryMap. cause=com.azure.core.amqp.implementation.handler.DeliveryNotOnLinkException: Cannot process the disposition request to set the state as 'Accepted{}' for the delivery with delivery tag (id) '698c7658-81b8-45cc-a887-ed625554004b'. Reason: The delivery with the delivery tag does not exist in the link's DeliveryMap., reason=GENERAL_ERROR
  • The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. For more information please see https://aka.ms/ServiceBusExceptions . Reference:6e926b7f-b7e7-4152-9c14-85a6c0394138, TrackingId:168a0c30-1c78-44e8-a98f-9a30326a35bf_B45, SystemTracker:osdu-r3mvp-dp1de-077y-bus:topic:recordstopic~15|wke-listener, Timestamp:2023-11-15T15:54:38

Actually these lock expiry messages seems to be false positive here as message lock duration is set to 5 minutes in my env. Application is immediately trying to mark the msg complete() after receiving it from subscription. Could this be side effect of using multiple receiver client?. Please suggest more on the above encountered error based in the internal implementation of sdk.

I can try out few more testing with this implementation but I am afraid if this is not going to help me. Also if single sync receiver client can not be used concurrently across thread then it will be become challenging for us to use pull based receiver client to satisfy our performance numbers.

One more question on the side line, do we have api to bulk ack list of messages using sync receiver client. As currently, application need to complete() individual msg to SB and its very time consuming. I mean i see the application spend almost 70-80 percentage of the time in acknowledging the read messages one by one. In the older SB sdk (which is marked as deprecated now) there was an api available to ack list of messages (which happened to be a missing in this new sdk). Can you please comment on this as well.

Sorry for long update, this behavior of SB is hurting us and impacting our production environments hence wanted to fix/optimize this as early as possible.

Thanks

dakshme avatar Nov 15 '23 18:11 dakshme

Hello @dakshme, receiving 100 messages will reduce the namespace-throttling-credit by 100 not by 1. Even though it’s a single receiveMessages call in SDK, in response to that call, over the wire there are 100 messages flowing, this number 100 is what decremented from the namespace-throttling-credit.

The bulk ack is not supported in the service level, so there is no such APIs in any of the SDKs (across languages). From what I heard; this was a decision made when designing new SDKs due to not having service support.

Regarding the errors you’re seeing with "One Receiver Instance per One Thread model", let me take look. For this model you tried, could you confirm the following –

  1. What is the Prefetch per Receiver Client instance?
  2. What is the batch-size and timeout?
  3. In the code, can you double check if you ever try to complete|abandon a message using a Receiver Client other than the one that returned the message?
  4. Regarding the Runnable that each thread runs, how the Runnable looks like? Is there a while loop that calls receiveMessages in a loop? If so, what’s the exit criteria for that while loop?
  5. Can you confirm that once a thread finish executing a Runnable only next submission using the same Receiver Client happens?
  6. How many total Threads are there hence the Receiver client instance?

anuchandy avatar Nov 15 '23 20:11 anuchandy

Hello @dakshme, to answer your question on lock expiry messages

A reason for seeing "the lock expired" is: Consider application called receiveMessages(100, 5-sec). The Client will send 100 AMQP-Credit to service. Let’s say service returned available 40 messages and client-side 5 sec time out elapsed. The application will process those 40 messages. But the server has no idea about client-side timeout, from service perspective there is another 60 AMQP-Credit pending. As soon as some messages are available (e.g. 20), the service will send it (pending AMQP-Credit is now 40), this gets buffered in the Client. Now assume application called receiveMessages(100, 5-sec) after 5 minutes. A part of this request will be served from the client’s buffer, which has 20 messages in it, it is possible that these messages are already expired while sitting in the buffer.

This buffering of message can happen either due to Prefetching (the Prefetch-Count settings, a.k.a. explicit Prefetch) or due to the above explained scenario (a.k.a. implicit Prefetching). This documentation on Prefetch Prefetch messages from Azure Service Bus - Azure Service Bus | Microsoft Learn is applicable for implicit and explicit prefetch, though doc does not specifically call out the above scenario.

anuchandy avatar Nov 16 '23 01:11 anuchandy

I wrote the test code (as shown below) that uses "One Receiver Client per One Thread model". I ran it for some time, with this approach I was able to receive and complete ~5000 messages in ~70 seconds. I prefilled the subscription with ~40,000 messages, and measured time for 4 sets with each set drains ~5000 messages, each set took ~70 seconds.

Note that I disabled Prefetch and opted-in for the V2 stack (upcoming engine for service bus, currently beta). See the code. Also, used separate TCP Connections, by using separate builder instances.

test-code
package com.contoso.sb;

import com.azure.core.util.ConfigurationBuilder;
import com.azure.core.util.IterableStream;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceiverClient;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public final class SyncReceiveMeasuring {
    public static void main(String[] args) throws InterruptedException {
        // uses beta.4
        //  <dependency>
        //      <groupId>com.azure</groupId>
        //      <artifactId>azure-messaging-servicebus</artifactId>
        //      <version>7.15.0-beta.4</version>
        //  </dependency>
        measure1();
    }

    private static void measure1() {
        final int concurrency = 5;
        final ArrayList<ServiceBusReceiverClient> clients = new ArrayList<>(concurrency);
        for (int i = 0; i < concurrency; i++) {
            // Build 5 clients, each from dedicated builder, so each gets it's own TCP connection.
            final ServiceBusReceiverClient receiverClient = new ServiceBusClientBuilder()
                .connectionString(System.getenv("CON_STR"))
                // Enabling new stack via configuration.
                .configuration(new ConfigurationBuilder()
                    .putProperty("com.azure.messaging.servicebus.nonSession.syncReceive.v2", "true")
                    .build())
                .receiver()
                .topicName(System.getenv("TOPIC_NAME"))
                .subscriptionName(System.getenv("SUBSCRIPTION_NAME"))
                .disableAutoComplete()
                .buildClient();
            clients.add(receiverClient);
        }

        final List<Work> works = new ArrayList<>(concurrency);
        for (int i = 0; i < concurrency; i++) {
            final ServiceBusReceiverClient client = clients.get(i);
            // 5 Callable (Work) to be submitted to the executer service later.
            works.add(new Work(i, client));
        }

        final ExecutorService executorService = Executors.newFixedThreadPool(concurrency, new DefaultThreadFactory("worker-thread"));
        while (true) {
            final List<FutureTask<WorkResult>> futureTaskList = new ArrayList<>();
            for (int i = 0; i < concurrency; i++) {
                futureTaskList.add((FutureTask<WorkResult>) executorService.submit(works.get(i)));
            }

            final AtomicInteger dispositionFailureCount = new AtomicInteger(0);
            final AtomicInteger messagesCount = new AtomicInteger(0);
            final long beginTime = System.currentTimeMillis();
            futureTaskList.forEach(workResultFuture -> {
                try {
                    final WorkResult result = workResultFuture.get();
                    dispositionFailureCount.addAndGet(result.getDispositionFailureCount());
                    messagesCount.addAndGet(result.getMessagesCount());
                    // For each thread, Print the number of messages received, completion failed and time taken.
                    result.print();
                } catch (InterruptedException | ExecutionException ex) {
                    System.out.println();
                    if (ex instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                }
            });
            final long endTime = System.currentTimeMillis();
            final long elapsedTime = endTime - beginTime;
            // Across all threads, Print the total number of messages received, total completion failed and time taken.
            System.out.println("[Threads-Joined]:"
                + " Messages-Count=" + messagesCount.get()
                + " Completion-Error=" + dispositionFailureCount.get()
                + " Duration (sec)=" + TimeUnit.SECONDS.convert(elapsedTime, TimeUnit.MILLISECONDS));
        }
    }

    static final class WorkResult {
        private final String threadName;
        private final int messagesCount;
        private final int dispositionFailureCount;
        private final long elapsedSeconds;
        WorkResult(String threadName, int messagesCount, int dispositionFailureCount, long elapsedSeconds) {
            this.threadName = threadName;
            this.messagesCount = messagesCount;
            this.dispositionFailureCount = dispositionFailureCount;
            this.elapsedSeconds = elapsedSeconds;
        }

        public int getDispositionFailureCount() {
            return dispositionFailureCount;
        }

        public int getMessagesCount() {
            return messagesCount;
        }

        public void print() {
            System.out.println("   [" + threadName + "]:"
                + " Messages-Count=" + messagesCount
                + " Completion-Error=" + dispositionFailureCount
                + " Duration (sec)=" + elapsedSeconds);
        }
    }

    static final class Work implements Callable<WorkResult> {
        private final int id;
        private final ServiceBusReceiverClient client;

        Work(int id, ServiceBusReceiverClient client) {
            this.id = id;
            this.client = client;
        }

        @Override
        public WorkResult call() {
            int dispositionFailureCount = 0;
            final long beginTime = System.currentTimeMillis();
            // Use a coarse timeout e.g, 5sec or more. Using too low values like 1 sec may sometime cause timeout to happen even before connection is established.
            final IterableStream<ServiceBusReceivedMessage> messagesStream = client.receiveMessages(100, Duration.ofSeconds(5));
            final List<ServiceBusReceivedMessage> messagesList = messagesStream.stream().collect(Collectors.toList());
            for (ServiceBusReceivedMessage message : messagesList) {
                try {
                    client.complete(message);
                } catch (RuntimeException e) {
                    dispositionFailureCount++;
                }
            }
            final long endTime = System.currentTimeMillis();
            long elapsedTime = endTime - beginTime;
            return new WorkResult(Thread.currentThread().getName(),
                messagesList.size(), dispositionFailureCount,
                TimeUnit.SECONDS.convert(elapsedTime, TimeUnit.MILLISECONDS));
        }
    }
}

sample-output

   [worker-thread-1-1]: Messages-Count=100 Completion-Error=0 Duration (sec)=7
   [worker-thread-1-2]: Messages-Count=100 Completion-Error=0 Duration (sec)=7
   [worker-thread-1-3]: Messages-Count=100 Completion-Error=0 Duration (sec)=7
   [worker-thread-1-4]: Messages-Count=100 Completion-Error=0 Duration (sec)=7
   [worker-thread-1-5]: Messages-Count=100 Completion-Error=0 Duration (sec)=7
[Threads-Joined]: Messages-Count=500 Completion-Error=0 Duration (sec)=7

   [worker-thread-1-1]: Messages-Count=100 Completion-Error=0 Duration (sec)=7
   [worker-thread-1-2]: Messages-Count=100 Completion-Error=0 Duration (sec)=7
   [worker-thread-1-3]: Messages-Count=100 Completion-Error=0 Duration (sec)=7
   [worker-thread-1-4]: Messages-Count=100 Completion-Error=0 Duration (sec)=7
   [worker-thread-1-5]: Messages-Count=100 Completion-Error=0 Duration (sec)=7
[Threads-Joined]: Messages-Count=500 Completion-Error=0 Duration (sec)=7

......

anuchandy avatar Nov 16 '23 01:11 anuchandy

Now I've the above code, you can ignore my earlier questions around repro code.

anuchandy avatar Nov 16 '23 02:11 anuchandy

@anuchandy Yes your test program looks close to what I am doing here. Just additional I have used cyclic barrier on top of regular executor service. So we are good with it.

Also, as suggested I removed prefect count (while init client) and observing better performance now. At least not seeing those err (reported yesterday). I am also observing around 5k msgs processing in a approx 1 min. I will continue more testing around and keep you posted.

Can you also please explain me more about this V2 stack:

configuration(new ConfigurationBuilder()
                    .putProperty("com.azure.messaging.servicebus.nonSession.syncReceive.v2", "true")
                    .build()

How this can help or optimize the client ? or if you can share some documentation around it. I assume this feature is available in 7.15.0-beta.4.

Thanks.

dakshme avatar Nov 16 '23 13:11 dakshme

Hello, @dakshme, good to hear that the initial testing is positive.

The reason for no or less occurrence of error on complete call is likely is due to disabling the "Explicit Prefetch" (while building client), which reduces the chances of buffering there by expiration of message lock while in buffer. Also, beware of the "Implicit Prefetch" we discussed earlier.

We both seems to observe the same measurement in "One Receiver Client per One Thread model". :)

Two questions –

  1. Did you use one TCP connection per Receiver Client approach like I tried in the test program (By choosing dedicated builder, not shared)? Dedicated builder per client instance reduces the internal noise compared to shared builder, which is sometime good.
  2. Are you testing with 7.15.0-beta.4?

V2 stack is the new upcoming underlying engine of messaging libraries. We addressed many known issues as part of V2 stack. Some of the main improvements are – removal of locks in critical paths, made the AMQP-Credit calculation more stable, optimized internal buffering that were causing complete() to timeout, addressed out of order message delivery in processor clients, optimized use of threads (so less thread hoping, context switch), and there are more, improving overall stability.

We call the underlying engine of currently GA-ed versions V1 stack. The V2 stack today exists in the beta releases (e.g. 7.15.0-beta.4), but it exists side by side with V1 stack. The library allows opt-in or opt-out V1 or V2 Stack.

For example, in latest beta, the ProcessorClient (session unaware one) by default uses V2 stack and it can be opted-out to use V1 stack. Similarly, the pull based synchronous client by default is on V1 stack and can be opted-in to use V2 stack. The configuration you saw in the test program is for opting-in the synchronous client to use V2 stack. The plan is to eventually phase out the V1 stack, so all clients run on V2. Hope this clarifies.

anuchandy avatar Nov 16 '23 20:11 anuchandy

Hello @anuchandy,

To answer above question:

  1. Yes, I am doing with the approach of each thread having its own receiver client. So basically if I have 5 thread then I build 5 separate sync receiver client.
  2. Yes I am also using the same 7.15.0-beta.4 version.

When the final version of above beta will be released, any tentative time lines? Also whether it will also address the thread blocking issue (in case of concurrent threads using same receiver client) ?

Some updates: With my further testing I am seeing better results now with this implementation approach. For now we seems to be good with this. Just need a small confirmation from your side once the final version of this beta is out along with the fix (i reported originally with this issue). I am able to see 100,000 messages processing within 22 mins (with the thread pool config I am using). Which seems to be fine to us for now.

Also, during my testing I observed the below messages (from sdk) getting logged intermittently. Just wanted to know if these are just info msgs or if there is anything concerning in my implementation:

{"az.sdk.message":"The mediator is active.","messageFlux":"mf_736ba5_1700158420500","connectionId":"MF_172210_1700158420579","linkName":"teststopic/subscriptions/test-listener_9f915f_1700158420494","entityPath":"teststopic/subscriptions/test-listener"}

{"az.sdk.message":"Setting next mediator and waiting for activation.","messageFlux":"mf_10fa96_1700158420500","connectionId":"MF_348ca4_1700158420579","linkName":"teststopic/subscriptions/test-listener_3f33a3_1700158420494","entityPath":"teststopic/subscriptions/test-listener"}

Please look at them let confirm it to me once. Otherwise I am good with this issue once we get the fixed version of this lib.

One more query on the side notes: Is there any way I can mock this.messageReceiver.receiveMessages(batch, Duration.ofMillis(500)); method to return an IterableStream<ServiceBusReceivedMessage>. I need it for writing junit for mu receiver class (in case of this sync receiver clinet).

During mock on receiverMessages method I want to return an iterable stream obj of ServiceBusReceivedMessage. Where the iterable list contains some dummy messages. Unfortunately, I am unable to create obj of ServiceBusReceivedMessage class as its final and constructors are defined with default access (hence can not init it). I request if you can provide a way or reference code to mock or write junit for this receiver method so it able to return an iterable obj having some actual dummy msgs.

Thanks.

dakshme avatar Nov 22 '23 17:11 dakshme

Hello @dakshme,

  1. The beta line is currently going through customer and in-house testing. Overall, the feedback is positive, and beta is stable. If all goes well, we plan the first GA in early next year (first quarter).
  2. We released 7.15.0-beta.5 today (11/22), which has an additional fix to reduce memory consumption when sending messages. Unsure, if your use case includes send or is limited to receive.
  3. We’ll target the multi-thread blocking fix, this needs additional validation, I hope we get bandwidth to include this in the first GA. But with your new model (One Receiver One Thread), IIUC, you’re not blocked by this, right?
  4. That info log messages appears each time there is an attempt to create a receive channel to the service bus. The channel is usually long-lived, unless the client is closed, or the channel is idle for long time (I guess ~10 min), server throttles, network errors. It is safe to ignore these log lines. These lines are showing that underneath channel is recovering.
  5. One more thing, I would suggest using a coarse timeout value for receive(), say 3-5 seconds or above. When the first receive call happen on the client or when channel recover underneath (see point 4), there are a series of operations - bidirectional handshake, authentication, then credit to written to the channel, once service receive the credit, it sends messages which client needs to decode and deliver to the application. If the timeout is too small, it can result in cancelling one of these operations in the middle, only for application to retry again, resulting in all or subset of above operations to trigger again.
  6. Regarding the general Mocking question, please see this discussion .

anuchandy avatar Nov 23 '23 00:11 anuchandy

Hello @anuchandy ,

In continuation with the above implementation, in my implementation (this.messageReceiver.receiveMessages(batch, Duration.ofMillis(500))) I have kept max wait time as 500 miliseconds. With this configuration the receiver client works wells most of the time, but in certain case I am observing issue that the SB server stops returning the messages to receiver call (though the messages are stacked). This happens only to specific data partition in this environment. Although, the other dp(s) continue receiving msgs normally.

On the problematic dp topic, we send a load of about 100,000 msgs over a period of time. The application continues pulling msgs (using this async client with max wait time as 500 mili secs). Then suddenly from one point it stops receiving messages. Following is the are series of error (sdk err logs) I captured in the application logs:

1/6/2024, 12:26:14.764 PM {"az.sdk.message":"The request was terminated because the namespace osdu-r3mvp-dp3ps-1u86-bus is being throttled. Error code : 50009. Please wait 2 seconds and try again. To know more visit https://aka.ms/sbResourceMgrExceptions and https://aka.ms/ServiceBusThrottling TrackingId:1dd6c549004bf3740000bede65824d7e_G36_B16, SystemTracker:G36:Recv:421806148:637577493003800000:<dp details, subscription name>:F5:C521076, Timestamp:2024-01-06T12:24:39, errorContext[NAMESPACE: . ERROR CONTEXT: N/A, PATH: < subscription name>, REFERENCE_ID: BD62A2280004610A0004611065824D7E_G36, LINK_CREDIT: 5]","exception":"The request was terminated because the namespace <dp details, subscription name> is being throttled. Error code : 50009. Please wait 2 seconds and try again. To know more visit https://aka.ms/sbResourceMgrExceptions and https://aka.ms/ServiceBusThrottling TrackingId:1dd6c549004bf3740000bede65824d7e_G36_B16, SystemTracker:G36:Recv:421806148:637577493003800000:<dp details, subscription name>:F5:C521076, Timestamp:2024-01-06T12:24:39, errorContext[NAMESPACE: osdu-r3mvp-dp3ps-1u86-bus.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: recordstopic/subscriptions/wke-listener, REFERENCE_ID: BD62A2280004610A0004611065824D7E_G36, LINK_CREDIT: 5]","entityPath":"<dp details, subscription name>","linkName":"<dp details, subscription name>"}

1/6/2024, 12:33:01.288 PM {"az.sdk.message":"Operation not successful.","entityPath":"<dp details, subscription name>","status":"SERVICE_UNAVAILABLE","description":"The request was terminated because the namespace <dp details, subscription name> is being throttled. Error code : 50009. Please wait 2 seconds and try again. To know more visit https://aka.ms/sbResourceMgrExceptions and https://aka.ms/ServiceBusThrottling TrackingId:f964b0c0-3d32-4891-88ee-080bb45b8003_B21, SystemTracker:<dp details, subscription name>, Timestamp:2024-01-06T12:33:01 Reference:1ecffb84-bac0-4a3e-be48-5e62e49b0777, TrackingId:93a77f86-93eb-4b04-be59-e832b8cfa550_G82, SystemTracker:NoSystemTracker, Timestamp:2024-01-06T12:33:01","condition":"com.microsoft:server-busy"}

1/6/2024, 12:44:36.896 PM {"az.sdk.message":"Transient error occurred.","exception":"The link '<dp details, subscription name>' is force detached. Code: aggregate-link354861. Details: AmqpMessagePartitioningEntityMessageCache.IdleTimerExpired: Idle timeout in Seconds: 900., errorContext[NAMESPACE: osdu-r3mvp-dp3ps-1u86-bus.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: <dp details, subscription name>, REFERENCE_ID: BD62E5A000056A2200056A2D658251E6_G22, LINK_CREDIT: 0]","linkName":"n/a","entityPath":"n/a","attempt":1,"retryAfter":4511}

As per the logs I understand that it throttled the requests few a times, it worked also in between during that time. It also throttle requests with "status":"SERVICE_UNAVAILABLE". Finally at 1/6/2024, 12:44:36.896 PM (the last log above) It gave this error and after this it never recovered to receive any message. After this point it was the same log for this particular dp receiver client.

With earlier implementation we never got this issue (when we used single receiver client). It started coming with multiple receiver client for the same dp. The above issue is reproducible.

Need you suggestion/inputs what could be the cause of this observation

Thanks.

dakshme avatar Jan 11 '24 18:01 dakshme

@anuchandy can you please help me with above query,

dakshme avatar Jan 12 '24 17:01 dakshme

Hello @dakshme, for production deployments, I would strongly suggest using a coarse timeout in the range 3-5 seconds. If you remember, earlier we discussed the importance of coarse timeout, and problem with low timeout that can overwhelm the system during recovery phases. https://github.com/Azure/azure-sdk-for-java/issues/37594#issuecomment-1823677260

I’ll take a look and see if the low timeout put the system in a non-recoverable state and how we can improve here.

anuchandy avatar Jan 17 '24 02:01 anuchandy

@anuchandy Thank you for your response. I already made the max wait time to 5 seconds but I see that with this config the said issue i.e. AmqpMessagePartitioningEntityMessageCache.IdleTimerExpired: Idle timeout in Seconds: 900 delays but it still occurs after few days. I need to understand the following points:

  1. In response to my this original issue your earlier fix (https://github.com/Azure/azure-sdk-for-java/pull/37665) is now available in any latest version of the lib ?. If yes, please help me with the GA version of the lib.

  2. why even after 5 second I am continuously hitting this issues, do we need to consider a higher value such as 10 seconds or something. Would that guarantee that this issue wont occur in that case?.

  3. Also I see whenever sdk gives this error/exception: the receiver client stuck and the sdk never throw any exception or return until we see this err? Shall we consider writing a workaround of having a Timertask on this client thread and to interrupt it after a certain period it it doesn't return? {"az.sdk.message":"Transient error occurred.","exception":"The link 'recordstopic/subscriptions/test-listener_3b405a_1705500283844' is force detached. Code: aggregate-link61730040. Details: AmqpMessagePartitioningEntityMessageCache.IdleTimerExpired: Idle timeout in Seconds: 900., errorContext[NAMESPACE: test-dp3env-testing-bus.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: testtopic/subscriptions/test-listener, REFERENCE_ID: 83C5B50603ADECEC03ADECF865A7DE7B_G22, LINK_CREDIT: 0]","linkName":"n/a","entityPath":"n/a","attempt":1,"retryAfter":4511}

  4. Is it not possible to deal with smaller max wait time internally within the sdk, that is whatever underlying activity/operations needed by SB server should not be consider under the max wait time given by the application ?. Otherwise in the sdk there must be a check or validation on the minimum max wait time value which should be passed by any application. This is important from application development side otherwise it causing lots of confusion and increasing the overall development time also.

Please suggest with the above points.

Thanks, Mahesh Daksha

dakshme avatar Jan 18 '24 09:01 dakshme

Hi @dakshme, thank you for the additional context.

Looking at the below log message you shared,

{"az.sdk.message":"Transient error occurred." …

It seems the environment is loading 7.14.x version or v1 code in 7.15.0-beta.x. When we both worked on designing the multi-receiver solution back in November-2023, we used 7.15.0-beta.x version. The above log message is coming from the class ServiceBusReceiveLinkProcessor. In 7.15.0-beta.x, with the v2 configuration-value "com.azure.messaging.servicebus.nonSession.syncReceive.v2" set, we no longer use this class.

The 7.15.0-beta.x line has many receiver reliability improvements, that include better recoveries when connection goes idle.

  1. It could be that, in your solution, somewhere else in dependency tree it gets downgraded to 7.14.x or it indeed uses 7.15.0-beta.x but without the v2 configuration-value "com.azure.messaging.servicebus.nonSession.syncReceive.v2" set. Could you check this?
  2. I understand that there are separate receiver-client instances. Like discussed last time, could you ensure each of these clients are built from independent builder as well? I.e., create a new builder instance and obtain a client instance from it, repeat this (new_builder -> new_client pattern) for each client.

I’m working to GA 7.15.0, it should happen in few days. The #37665 will not be included in the first GA of 7.15.x, but all reliability improvements in beta so far. I hope with the current design of One_Receiver:One_Thread approach with serial access, we won’t hit 37665

anuchandy avatar Jan 18 '24 19:01 anuchandy

@anuchandy

Since the 7.15 dev was in beta mode so we purposefully switched back to earlier one. But after your last reply I changed the implementation to use the latest beta of 7.15. Unfortunately even after that we are encountering IdleTimerExpired error. We tried with max wait time as 7 second. It only delayed the problem but eventually the receiver client stopped pulling the msgs (after 2/3 days). We event tried keeping the ServiceBusClientBuilder object same for whole application running time (keeping separate obj of builder for each thread) and creating the ServiceBusReceiverClient (using builder obj) during each polling cycle. Even this result in to the same hang state where the azure lib do no release the application call. So even this approach did not work.

Now with this instability of the sdk it seems we do not have any option other than creating the ServiceBusClientBuilder along with ServiceBusReceiverClient every polling cycle to make it work properly. I understand creating Builder obj every time may be a expensive operation but we do no see any other possible way also. Please suggest. Quick suggestion will help as our development of certain critical fixes are stuck.

Intermittently, with the approach when I am creating the receiver client every time new and closing the same then I also see the below error msg in the log: "az.sdk.message":"The receiver didn't receive the disposition acknowledgment due to receive link closure." What does this mean? will it have any side effect? "az.sdk.message":"Cannot subscribe. Processor is already terminated." What does this mean? will it have any side effect?

Please note all the above combinations of implementation tried with latest beta of 7.15.0-beta.5 version of sdk.

Thanks, Mahesh D.

dakshme avatar Jan 24 '24 04:01 dakshme

Hello @ dakshme, I started an offline discussion with you on this, so we can look into actual usage and discuss/plan how to resolve this.

anuchandy avatar Jan 29 '24 23:01 anuchandy

Hello @anuchandy,

Just for the information and future reference I am noting down the below points we discussed during our offline meeting. As discussed over the call we expect these issues to be addressed in the upcoming version of lib in order to bring more stability in its operation:

  • Application thread when calling this.messageReceiver.receiveMessages(batch, Duration.ofMillis(500)) stuck/hangs with in SB sdk and never returns. This blocks the application execution. This is already identified and acknowledged by you.
  • When using multiple threads to receive or pull msgs using the same receiver client connection then also the above problem is encountered. This is also observed if we have dedicated connection builder for each object and open and close the client while receiving messages using their respective connection builder object.
  • Whenever there is any type of error or exception within the sdk/lib call then it should return or throw the same exception back to application. Currently it is observed that the library silently suppress the error and return the empty IterableStream. This causes false behavior at the application side as it has no way to recover from this erroneous state.
  • When using multiple thread having their own connection builder and receiver client (created and closed every time while receiving message). In this case the application able to work fine for few days, after that it is observed that application unable to receive any message from SB (though the messages are available). In this case also it is observed that the library keeps returning the empty object of IterableStream of messages and not throwing any error in this case as well.
  • If there is a hard requirement of keeping the value of max wait time in receiveMessages greater than 5 second or so then I suggest to either have a validation check on this or should be documented under best practice along with validation.

Please add any other point if I have missed on anything.

Need your help to have an early resolution to above mentioned issues as its impacting our development activity and eventually affecting customer environments.

Thanks, Mahesh Daksha

dakshme avatar Feb 02 '24 06:02 dakshme

Hello @anuchandy,

Just for the information and future reference I am noting down the below points we discussed during our offline meeting. As discussed over the call we expect these issues to be addressed in the upcoming version of lib in order to bring more stability in its operation:

  • Application thread when calling this.messageReceiver.receiveMessages(batch, Duration.ofMillis(500)) stuck/hangs with in SB sdk and never returns. This blocks the application execution. This is already identified and acknowledged by you.
  • When using multiple threads to receive or pull msgs using the same receiver client connection then also the above problem is encountered. This is also observed if we have dedicated connection builder for each object and open and close the client while receiving messages using their respective connection builder object.
  • Whenever there is any type of error or exception within the sdk/lib call then it should return or throw the same exception back to application. Currently it is observed that the library silently suppress the error and return the empty IterableStream. This causes false behavior at the application side as it has no way to recover from this erroneous state.
  • When using multiple thread having their own connection builder and receiver client (created and closed every time while receiving message). In this case the application able to work fine for few days, after that it is observed that application unable to receive any message from SB (though the messages are available). In this case also it is observed that the library keeps returning the empty object of IterableStream of messages and not throwing any error in this case as well.
  • If there is a hard requirement of keeping the value of max wait time in receiveMessages greater than 5 second or so then I suggest to either have a validation check on this or should be documented under best practice along with validation.

Please add any other point if I have missed on anything.

Need your help to have an early resolution to above mentioned issues as its impacting our development activity and eventually affecting customer environments.

Thanks, Mahesh Daksha

@anuchandy Any updates on the released version of the sdk addressing these points (as we discussed last time). To make the sdk more stable.

Thanks.

dakshme avatar May 10 '24 05:05 dakshme