azure-service-bus-java
azure-service-bus-java copied to clipboard
Getting "ReactorDispatcher instance is closed" error when trying to close the azure service bus connection after receiving message
Actual Behavior
- When I read a message from azure service bus using ServiceBusReceiverClient, I am successfully receiving the message, but when I try to close the connection it says ReactorDispatcher instance is closed as an error.
- I am using receiver.close() to close the connection after processing the received messages.
02:05:09,514 | ERROR | boundedElastic-6 | - - | ReactorDispatcher instance is closed.
02:05:09,514 | ERROR | parallel-1 | - - | ReactorDispatcher instance is closed.
Expected Behavior
- I should be able to close the connections after receiving the message successfully.
I am using:
<spring.boot.version>2.4.11</spring.boot.version>
<azure-messaging-servicebus>7.4.1</azure-messaging-servicebus>
Versions
- OS platform and version: Mac OS 11.6
- Maven package version or commit ID: Apache Maven 3.2.5
@Rawjyot, thanks for sharing the observation. We have already noticed via many git-issues that this log message with the "error" level is misleading users. The level of this log message changed to"warning" in 7.4.2 (in Oct release).
There are multiple threads running in parallel; hence it's possible that when the connection close is in progress, another thread may attempt to "dispatch" work to this connection, those attempts will be dropped with this "warning".
@anuchandy the problem we're having is that after receiving this warning message subsequent attempts to retrieve a message with a new ServiceBusReceiverClient will fail for several minutes and generate the same error message. Eventually an attempt is successful but, after polling a few times we get the same error again when calling close().
You mentioned the library had multiple threads running in parallel. When our code is closing and destroying a ServiceBusReceiverClient instance and then shortly creating a new instance could we be instead getting the same instance that was previously closed but for some reason not destroyed? Could the ServiceBusClientBuilder factory be reusing an already closed instance?
This behavior is not seen using the older 7.1.0 version of azure-messaging-servicebus.
I see; yes Jim, ServiceBusClientBuilder shares a few resources among the clients built from it. It is possible that after the closing of the first client but before the completion of underlying async resources cleanup, a newing up of the second client might be getting the endpoint from the cache. Due to the async cleanup/notification, it takes some time for the second client to detect it.
Can you unblock for now by using a new builder ?
@anuchandy the code is using a new ServiceClientBuilder every time:
receiver = new ServiceBusClientBuilder() .connectionString(azureServiceBusConfig.azureBusConnectionString) .receiver() .disableAutoComplete() .topicName(azureServiceBusConfig.azureBusTopicName) .subscriptionName(azureServiceBusConfig.azureBusSubscriptionName) .buildClient();
IterableStream<ServiceBusReceivedMessage> messages = receiver.receiveMessages(1, Duration.ofSeconds(5));
// Examine and act on any message
if (message != null) { receiver.complete(message); } if (receiver != null) { receiver.close(); }
Hi @lordarcy, I tried to repro the behavior using the code below, where we can see "create-client, receive-message, and close-client" in a loop (10 times).
import java.time.Duration;
import java.time.OffsetDateTime;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusReceiverClient;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
public class ReceiveCloseLoopSample {
public static void main(String[] args) {
final String connectionString = System.getenv("CON_STR");
final String queueName = System.getenv("QUEUE_NAME");
// Loop for 10 times.
for (int i = 0; i < 10; i++) {
System.out.println(OffsetDateTime.now() + ": Creating Client#" + i);
ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
.connectionString(connectionString)
.receiver()
.disableAutoComplete()
.queueName(queueName)
.buildClient();
IterableStream<ServiceBusReceivedMessage> messages = receiver.receiveMessages(1, Duration.ofSeconds(5));
messages.forEach(message -> {
System.out.println(OffsetDateTime.now() + ": Received Message [seq_number:" + message.getSequenceNumber() + "]");
if (message != null) {
try {
receiver.complete(message);
} catch (Throwable t) {
t.printStackTrace();
}
System.out.println(OffsetDateTime.now() + ": complete call finished for Message [seq_number:" + message.getSequenceNumber() + "]");
}
});
System.out.println(OffsetDateTime.now() + ": Closing Client#" + i);
receiver.close();
System.out.println("");
}
}
}
But I don't see the issue you mentioned happening, the output of the execution looks like this
2021-11-10T09:12:12.401284-08:00: Creating Client#0
2021-11-10T09:12:14.074138-08:00: Received Message [seq_number:6]
2021-11-10T09:12:14.260127-08:00: complete call finished for Message [seq_number:6]
2021-11-10T09:12:14.260669-08:00: Closing Client#0
2021-11-10T09:12:14.272972-08:00: Creating Client#1
2021-11-10T09:12:14.613589-08:00: Received Message [seq_number:7]
2021-11-10T09:12:14.660616-08:00: complete call finished for Message [seq_number:7]
2021-11-10T09:12:14.660782-08:00: Closing Client#1
2021-11-10T09:12:14.662825-08:00: Creating Client#2
2021-11-10T09:12:14.967993-08:00: Received Message [seq_number:8]
2021-11-10T09:12:15.006407-08:00: complete call finished for Message [seq_number:8]
2021-11-10T09:12:15.006554-08:00: Closing Client#2
2021-11-10T09:12:15.008189-08:00: Creating Client#3
2021-11-10T09:12:15.318079-08:00: Received Message [seq_number:9]
2021-11-10T09:12:15.374040-08:00: complete call finished for Message [seq_number:9]
2021-11-10T09:12:15.374187-08:00: Closing Client#3
2021-11-10T09:12:15.375971-08:00: Creating Client#4
2021-11-10T09:12:15.668797-08:00: Received Message [seq_number:10]
2021-11-10T09:12:15.718561-08:00: complete call finished for Message [seq_number:10]
2021-11-10T09:12:15.718686-08:00: Closing Client#4
2021-11-10T09:12:15.719595-08:00: Creating Client#5
2021-11-10T09:12:16.004302-08:00: Received Message [seq_number:11]
2021-11-10T09:12:16.049758-08:00: complete call finished for Message [seq_number:11]
2021-11-10T09:12:16.049846-08:00: Closing Client#5
2021-11-10T09:12:16.050655-08:00: Creating Client#6
2021-11-10T09:12:16.346686-08:00: Received Message [seq_number:12]
2021-11-10T09:12:16.383272-08:00: complete call finished for Message [seq_number:12]
2021-11-10T09:12:16.383415-08:00: Closing Client#6
2021-11-10T09:12:16.385344-08:00: Creating Client#7
2021-11-10T09:12:16.625618-08:00: Received Message [seq_number:13]
2021-11-10T09:12:16.660944-08:00: complete call finished for Message [seq_number:13]
2021-11-10T09:12:16.661100-08:00: Closing Client#7
2021-11-10T09:12:16.662135-08:00: Creating Client#8
2021-11-10T09:12:16.901708-08:00: Received Message [seq_number:14]
2021-11-10T09:12:16.945340-08:00: complete call finished for Message [seq_number:14]
2021-11-10T09:12:16.945430-08:00: Closing Client#8
2021-11-10T09:12:16.946211-08:00: Creating Client#9
2021-11-10T09:12:17.161688-08:00: Received Message [seq_number:15]
2021-11-10T09:12:17.204560-08:00: complete call finished for Message [seq_number:15]
2021-11-10T09:12:17.204756-08:00: Closing Client#9
Could you please double check my code and also see if this is how your application is attempting to use the client.
I see three main differences:
- The class that is instantiating the ServiceBusClientBuilder is in turn getting instantiated each time a polling transaction occurs. This happens because the polling is started from an Apache Camel route that calls a Processor which in turn creates a new MessageReceiver. The MessageReceiver has a try/catch block containing the code from comment https://github.com/Azure/azure-service-bus-java/issues/418#issuecomment-950880040. This pattern is used because each polling transaction is supposed to be stateless.
- If a message is not found then a new ServiceBusClientBuilder is created with the subscription name for the DLQ and a similar call to receiveMessages is made to poll the DLQ.
- Each polling transaction has a fifteen second timer before starting a new transaction.
Thanks, I'm trying to map your points back to code -
so you have a concept known as "polling transaction." Each time the "polling transaction" executes, it "create-client instance, receive a message, and close-client instance", meaning there is a 1:1 relationship between a client instance and an execution of "polling transaction".
Isn't the sample code I posted doing, where one iteration of the loop corresponds to one execution of "polling transaction."?
The part missing in the my sample code is - If we find that no message was received, then the code should create a receiver client for DLQ then attempt to receive a message from it? Is this understanding correct?
The two things missing from your sample are:
- The time difference between each ServiceBusClientBuilder getting created
- Because our code is coming from a new Camel route for each poll it's also likely using a different thread each time the pair of ServiceBusClientBuilder instances (main queue and DLQ) are getting created
@lordarcy Sorry for the late follow-up, Is it possible for your team to update the code I shared so far to match the way you are using the library, so that this can be reproduced on our end? I've pushed the code to git we've been discussing above to here https://github.com/anuchandy/attempt-repro-polling-bug. It can be cloned, ready to run once you set the connection string and queue in the env vars. You and Rowjyot have access to the git.
@Rawjyot is this something you can try?
Yes sure I will give it a try
On Sat, 11 Dec, 2021, 2:03 am Jim Robbins, @.***> wrote:
@Rawjyot https://github.com/Rawjyot is this something you can try?
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/Azure/azure-service-bus-java/issues/418#issuecomment-991275763, or unsubscribe https://github.com/notifications/unsubscribe-auth/AEO3U2VDN4UGH7MRPJDR663UQJPZ7ANCNFSM5GAQRLLA . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.
@Rawjyot have you had a chance to try updating the code sample with the library to try and reproduce the problem?
@anuchandy can you please provide me access to https://github.com/anuchandy/attempt-repro-polling-bug again? I tried accessing it and it says the invitation has expired.
@anuchandy I don't have access to the above private repository you shared, meanwhile, @lordarcy shared the code base with zip. I have gone through the codebase, here are a few observations.
- The code shared in the above repo is using ServiceBusReceiverClient to poll the queues and the actual code with the issue is using ServiceBusClientBuilder to poll the topics.
- The above codebase is not using spring boot as boilerplate and the actual code with the issue is using the latest version of spring boot as boilerplate.
- The above codebase is just calling the queues on the single thread but the actual code with the issue is calling the topics to read messages in asynchronous fashion, which means it will have multi threads and multiple threads will be closed, which lands in the actual issue.
I will try to replicate these parameters and provide you with updated code. cc: @lordarcy