azure-service-bus-java icon indicating copy to clipboard operation
azure-service-bus-java copied to clipboard

Getting "ReactorDispatcher instance is closed" error when trying to close the azure service bus connection after receiving message

Open Rawjyot opened this issue 4 years ago • 14 comments

Actual Behavior

  1. When I read a message from azure service bus using ServiceBusReceiverClient, I am successfully receiving the message, but when I try to close the connection it says ReactorDispatcher instance is closed as an error.
  2. I am using receiver.close() to close the connection after processing the received messages.
02:05:09,514 | ERROR | boundedElastic-6 |  -  -  |  ReactorDispatcher instance is closed.
02:05:09,514 | ERROR | parallel-1       |  -  -  |  ReactorDispatcher instance is closed.

Expected Behavior

  1. I should be able to close the connections after receiving the message successfully.

I am using:

<spring.boot.version>2.4.11</spring.boot.version>
<azure-messaging-servicebus>7.4.1</azure-messaging-servicebus>

Versions

  • OS platform and version: Mac OS 11.6
  • Maven package version or commit ID: Apache Maven 3.2.5

Rawjyot avatar Oct 14 '21 20:10 Rawjyot

@Rawjyot, thanks for sharing the observation. We have already noticed via many git-issues that this log message with the "error" level is misleading users. The level of this log message changed to"warning" in 7.4.2 (in Oct release).

There are multiple threads running in parallel; hence it's possible that when the connection close is in progress, another thread may attempt to "dispatch" work to this connection, those attempts will be dropped with this "warning".

anuchandy avatar Oct 16 '21 00:10 anuchandy

@anuchandy the problem we're having is that after receiving this warning message subsequent attempts to retrieve a message with a new ServiceBusReceiverClient will fail for several minutes and generate the same error message. Eventually an attempt is successful but, after polling a few times we get the same error again when calling close().

You mentioned the library had multiple threads running in parallel. When our code is closing and destroying a ServiceBusReceiverClient instance and then shortly creating a new instance could we be instead getting the same instance that was previously closed but for some reason not destroyed? Could the ServiceBusClientBuilder factory be reusing an already closed instance?

This behavior is not seen using the older 7.1.0 version of azure-messaging-servicebus.

lordarcy avatar Oct 19 '21 17:10 lordarcy

I see; yes Jim, ServiceBusClientBuilder shares a few resources among the clients built from it. It is possible that after the closing of the first client but before the completion of underlying async resources cleanup, a newing up of the second client might be getting the endpoint from the cache. Due to the async cleanup/notification, it takes some time for the second client to detect it.

Can you unblock for now by using a new builder ?

anuchandy avatar Oct 21 '21 03:10 anuchandy

@anuchandy the code is using a new ServiceClientBuilder every time:

receiver = new ServiceBusClientBuilder() .connectionString(azureServiceBusConfig.azureBusConnectionString) .receiver() .disableAutoComplete() .topicName(azureServiceBusConfig.azureBusTopicName) .subscriptionName(azureServiceBusConfig.azureBusSubscriptionName) .buildClient();

IterableStream<ServiceBusReceivedMessage> messages = receiver.receiveMessages(1, Duration.ofSeconds(5));

// Examine and act on any message

if (message != null) { receiver.complete(message); } if (receiver != null) { receiver.close(); }

lordarcy avatar Oct 25 '21 12:10 lordarcy

Hi @lordarcy, I tried to repro the behavior using the code below, where we can see "create-client, receive-message, and close-client" in a loop (10 times).


import java.time.Duration;
import java.time.OffsetDateTime;
import  com.azure.messaging.servicebus.ServiceBusClientBuilder;
import  com.azure.messaging.servicebus.ServiceBusReceiverClient;
import  com.azure.messaging.servicebus.ServiceBusReceivedMessage;

public class ReceiveCloseLoopSample {
    public static void main(String[] args) {
        final String connectionString = System.getenv("CON_STR");
        final String queueName = System.getenv("QUEUE_NAME");

        // Loop for 10 times.
        for (int i = 0; i < 10; i++) {

            System.out.println(OffsetDateTime.now() + ": Creating Client#" + i);
            ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .receiver()
                .disableAutoComplete()
                .queueName(queueName)
                .buildClient();

            IterableStream<ServiceBusReceivedMessage> messages = receiver.receiveMessages(1, Duration.ofSeconds(5));
            messages.forEach(message -> {
                System.out.println(OffsetDateTime.now() + ": Received Message [seq_number:" + message.getSequenceNumber() + "]");
                if (message != null) {
                    try {
                        receiver.complete(message);
                    } catch (Throwable t) {
                        t.printStackTrace();
                    }
                    System.out.println(OffsetDateTime.now() + ": complete call finished for Message [seq_number:" + message.getSequenceNumber() + "]");
                }
            });

            System.out.println(OffsetDateTime.now() + ": Closing Client#" + i);
            receiver.close();
            System.out.println("");
        }
    }
}

But I don't see the issue you mentioned happening, the output of the execution looks like this

2021-11-10T09:12:12.401284-08:00: Creating Client#0
2021-11-10T09:12:14.074138-08:00: Received Message [seq_number:6]
2021-11-10T09:12:14.260127-08:00: complete call finished for Message [seq_number:6]
2021-11-10T09:12:14.260669-08:00: Closing Client#0

2021-11-10T09:12:14.272972-08:00: Creating Client#1
2021-11-10T09:12:14.613589-08:00: Received Message [seq_number:7]
2021-11-10T09:12:14.660616-08:00: complete call finished for Message [seq_number:7]
2021-11-10T09:12:14.660782-08:00: Closing Client#1

2021-11-10T09:12:14.662825-08:00: Creating Client#2
2021-11-10T09:12:14.967993-08:00: Received Message [seq_number:8]
2021-11-10T09:12:15.006407-08:00: complete call finished for Message [seq_number:8]
2021-11-10T09:12:15.006554-08:00: Closing Client#2

2021-11-10T09:12:15.008189-08:00: Creating Client#3
2021-11-10T09:12:15.318079-08:00: Received Message [seq_number:9]
2021-11-10T09:12:15.374040-08:00: complete call finished for Message [seq_number:9]
2021-11-10T09:12:15.374187-08:00: Closing Client#3

2021-11-10T09:12:15.375971-08:00: Creating Client#4
2021-11-10T09:12:15.668797-08:00: Received Message [seq_number:10]
2021-11-10T09:12:15.718561-08:00: complete call finished for Message [seq_number:10]
2021-11-10T09:12:15.718686-08:00: Closing Client#4

2021-11-10T09:12:15.719595-08:00: Creating Client#5
2021-11-10T09:12:16.004302-08:00: Received Message [seq_number:11]
2021-11-10T09:12:16.049758-08:00: complete call finished for Message [seq_number:11]
2021-11-10T09:12:16.049846-08:00: Closing Client#5

2021-11-10T09:12:16.050655-08:00: Creating Client#6
2021-11-10T09:12:16.346686-08:00: Received Message [seq_number:12]
2021-11-10T09:12:16.383272-08:00: complete call finished for Message [seq_number:12]
2021-11-10T09:12:16.383415-08:00: Closing Client#6

2021-11-10T09:12:16.385344-08:00: Creating Client#7
2021-11-10T09:12:16.625618-08:00: Received Message [seq_number:13]
2021-11-10T09:12:16.660944-08:00: complete call finished for Message [seq_number:13]
2021-11-10T09:12:16.661100-08:00: Closing Client#7

2021-11-10T09:12:16.662135-08:00: Creating Client#8
2021-11-10T09:12:16.901708-08:00: Received Message [seq_number:14]
2021-11-10T09:12:16.945340-08:00: complete call finished for Message [seq_number:14]
2021-11-10T09:12:16.945430-08:00: Closing Client#8

2021-11-10T09:12:16.946211-08:00: Creating Client#9
2021-11-10T09:12:17.161688-08:00: Received Message [seq_number:15]
2021-11-10T09:12:17.204560-08:00: complete call finished for Message [seq_number:15]
2021-11-10T09:12:17.204756-08:00: Closing Client#9

Could you please double check my code and also see if this is how your application is attempting to use the client.

anuchandy avatar Nov 10 '21 17:11 anuchandy

I see three main differences:

  1. The class that is instantiating the ServiceBusClientBuilder is in turn getting instantiated each time a polling transaction occurs. This happens because the polling is started from an Apache Camel route that calls a Processor which in turn creates a new MessageReceiver. The MessageReceiver has a try/catch block containing the code from comment https://github.com/Azure/azure-service-bus-java/issues/418#issuecomment-950880040. This pattern is used because each polling transaction is supposed to be stateless.
  2. If a message is not found then a new ServiceBusClientBuilder is created with the subscription name for the DLQ and a similar call to receiveMessages is made to poll the DLQ.
  3. Each polling transaction has a fifteen second timer before starting a new transaction.

lordarcy avatar Nov 10 '21 20:11 lordarcy

Thanks, I'm trying to map your points back to code -

so you have a concept known as "polling transaction." Each time the "polling transaction" executes, it "create-client instance, receive a message, and close-client instance", meaning there is a 1:1 relationship between a client instance and an execution of "polling transaction".

Isn't the sample code I posted doing, where one iteration of the loop corresponds to one execution of "polling transaction."?

The part missing in the my sample code is - If we find that no message was received, then the code should create a receiver client for DLQ then attempt to receive a message from it? Is this understanding correct?

anuchandy avatar Nov 10 '21 23:11 anuchandy

The two things missing from your sample are:

  • The time difference between each ServiceBusClientBuilder getting created
  • Because our code is coming from a new Camel route for each poll it's also likely using a different thread each time the pair of ServiceBusClientBuilder instances (main queue and DLQ) are getting created

lordarcy avatar Nov 11 '21 04:11 lordarcy

@lordarcy Sorry for the late follow-up, Is it possible for your team to update the code I shared so far to match the way you are using the library, so that this can be reproduced on our end? I've pushed the code to git we've been discussing above to here https://github.com/anuchandy/attempt-repro-polling-bug. It can be cloned, ready to run once you set the connection string and queue in the env vars. You and Rowjyot have access to the git.

anuchandy avatar Dec 10 '21 20:12 anuchandy

@Rawjyot is this something you can try?

lordarcy avatar Dec 10 '21 20:12 lordarcy

Yes sure I will give it a try

On Sat, 11 Dec, 2021, 2:03 am Jim Robbins, @.***> wrote:

@Rawjyot https://github.com/Rawjyot is this something you can try?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/Azure/azure-service-bus-java/issues/418#issuecomment-991275763, or unsubscribe https://github.com/notifications/unsubscribe-auth/AEO3U2VDN4UGH7MRPJDR663UQJPZ7ANCNFSM5GAQRLLA . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

Rawjyot avatar Dec 11 '21 10:12 Rawjyot

@Rawjyot have you had a chance to try updating the code sample with the library to try and reproduce the problem?

lordarcy avatar Dec 17 '21 19:12 lordarcy

@anuchandy can you please provide me access to https://github.com/anuchandy/attempt-repro-polling-bug again? I tried accessing it and it says the invitation has expired.

Rawjyot avatar Dec 20 '21 19:12 Rawjyot

@anuchandy I don't have access to the above private repository you shared, meanwhile, @lordarcy shared the code base with zip. I have gone through the codebase, here are a few observations.

  1. The code shared in the above repo is using ServiceBusReceiverClient to poll the queues and the actual code with the issue is using ServiceBusClientBuilder to poll the topics.
  2. The above codebase is not using spring boot as boilerplate and the actual code with the issue is using the latest version of spring boot as boilerplate.
  3. The above codebase is just calling the queues on the single thread but the actual code with the issue is calling the topics to read messages in asynchronous fashion, which means it will have multi threads and multiple threads will be closed, which lands in the actual issue.

I will try to replicate these parameters and provide you with updated code. cc: @lordarcy

Rawjyot avatar Dec 23 '21 17:12 Rawjyot