azure-sdk-for-java icon indicating copy to clipboard operation
azure-sdk-for-java copied to clipboard

Cannot subscribe. Processor is already terminated

Open frederikHartung-extern opened this issue 1 year ago • 4 comments

Describe the bug When stopping/closing the first ServiceBusSenderClient/ServiceBusProcessorClient in our spring boot app while Application shutdown in k8s, this two errors are logged:

logger: com.azure.core.amqp.implementation.AmqpChannelProcessor message: {"az.sdk.message":"Cannot subscribe. Processor is already terminated.","exception":"Cannot subscribe. Processor is already terminated.","connectionId":"MF_bdec02_1713997242025","entityPath":"$cbs"}

logger: reactor.core.publisher.Operators message: Operator called default onErrorDropped

The error is logged everytime, when a deployment is shutting down and the first ServiceBusProcessorClient is closed. When closing the second client, no error is logged/happening. When the client was not running, no error is logged.

The error for the ServiceBusSenderClient.close() is happening inconsistly for our cronjobs.

Exception or Stack Trace

for ServiceBusSenderClient.close(): reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Cannot subscribe. Processor is already terminated. Caused by: java.lang.IllegalStateException: Cannot subscribe. Processor is already terminated. at com.azure.core.amqp.implementation.AmqpChannelProcessor.subscribe(AmqpChannelProcessor.java:270) at reactor.core.publisher.Mono.subscribe(Mono.java:4568) at reactor.core.publisher.MonoWhen$WhenCoordinator.request(MonoWhen.java:229) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onSubscribe(MonoIgnoreThen.java:135) at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:61) at reactor.core.publisher.MonoWhen.subscribe(MonoWhen.java:101) at reactor.core.publisher.Mono.subscribe(Mono.java:4568) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265) at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) at reactor.core.publisher.Mono.subscribe(Mono.java:4568) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265) at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) at reactor.core.publisher.Mono.subscribe(Mono.java:4568) at reactor.core.publisher.Mono.subscribeWith(Mono.java:4634) at reactor.core.publisher.Mono.subscribe(Mono.java:4395) at com.azure.core.amqp.implementation.ReactorConnectionCache.dispose(ReactorConnectionCache.java:191) at com.azure.messaging.servicebus.ServiceBusClientBuilder$V2StackSupport.onClientClose(ServiceBusClientBuilder.java:1280) at com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.close(ServiceBusSenderAsyncClient.java:762) at com.azure.messaging.servicebus.ServiceBusSenderClient.close(ServiceBusSenderClient.java:474) at our code....

for ServiceBusProcessorClient.close(): reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Cannot subscribe. Processor is already terminated. Caused by: java.lang.IllegalStateException: Cannot subscribe. Processor is already terminated. at com.azure.core.amqp.implementation.AmqpChannelProcessor.subscribe(AmqpChannelProcessor.java:270) at reactor.core.publisher.Mono.subscribe(Mono.java:4568) at reactor.core.publisher.MonoWhen$WhenCoordinator.request(MonoWhen.java:229) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onSubscribe(MonoIgnoreThen.java:135) at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:61) at reactor.core.publisher.MonoWhen.subscribe(MonoWhen.java:101) at reactor.core.publisher.Mono.subscribe(Mono.java:4568) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265) at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) at reactor.core.publisher.Mono.subscribe(Mono.java:4568) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265) at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) at reactor.core.publisher.Mono.subscribe(Mono.java:4568) at reactor.core.publisher.Mono.subscribeWith(Mono.java:4634) at reactor.core.publisher.Mono.subscribe(Mono.java:4395) at com.azure.core.amqp.implementation.ReactorConnectionCache.dispose(ReactorConnectionCache.java:191) at com.azure.messaging.servicebus.ServiceBusClientBuilder$V2StackSupport.onClientClose(ServiceBusClientBuilder.java:1280) at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.close(ServiceBusReceiverAsyncClient.java:1509) at com.azure.messaging.servicebus.ServiceBusProcessor$RollingMessagePump.lambda$beginIntern$2(ServiceBusProcessor.java:237) at reactor.core.publisher.MonoUsing$MonoUsingSubscriber.cleanup(MonoUsing.java:191) at reactor.core.publisher.MonoUsing$MonoUsingSubscriber.cancel(MonoUsing.java:185) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2425) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2393) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2205) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2425) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2393) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2205) at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.cancel(FluxRetryWhen.java:167) at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157) at reactor.core.publisher.LambdaMonoSubscriber.dispose(LambdaMonoSubscriber.java:217) at reactor.core.Disposables$ListCompositeDisposable.dispose(Disposables.java:268) at reactor.core.Disposables$ListCompositeDisposable.dispose(Disposables.java:161) at com.azure.messaging.servicebus.ServiceBusProcessor$RollingMessagePump.dispose(ServiceBusProcessor.java:268) at com.azure.messaging.servicebus.ServiceBusProcessor.close(ServiceBusProcessor.java:119) at com.azure.messaging.servicebus.ServiceBusProcessorClient.close(ServiceBusProcessorClient.java:353) at our code...

To Reproduce Have a running spring boot app with a running ServiceBusProcessorClient. Let the container be replaced with a new deployment. In the logs you should find the error.

Code Snippet

// Creating the client: public ServiceBusProcessorClient createServiceBusProcessorClient(String connectionString, String queueName) { return new ServiceBusClientBuilder() .transportType(AmqpTransportType.AMQP_WEB_SOCKETS) .connectionString(connectionString) .processor() .queueName(queueName) .prefetchCount(0) .maxAutoLockRenewDuration(Duration.of(60L, ChronoUnit.MINUTES)) .maxConcurrentCalls(1) .processMessage(this::processMessage) .processError(this::processError) .disableAutoComplete() .buildProcessorClient(); }

// Start/Stop/Close: @Override public void start() { try { this.client = createServiceBusProcessorClient(connectionString, queueName); ((ServiceBusProcessorClient) client).start(); } catch (Exception e) { LOGGER.error("Error while starting QueueProcessor {}", loggingContext, e); } }

@Override public void stop() { if (client != null) { ((ServiceBusProcessorClient) client).stop(); } }

@Override public void close() { if (client != null) { ((ServiceBusProcessorClient) client).close(); } }

// Component, which manages the clients: @Component public class DataReceiver implements DisposableBean { private final List<MyServiceBusReceiveClient> clients = new ArrayList<>();

// Clients is filled with multiple clients in the constructor and not shown here.

@Override
public void destroy() {
    for (ServiceBusReceiveClient client : clients) {
        if (client != null) {
            try {
                client.stop();
                client.close();
            } catch (Exception e) {
                LOGGER.error("Error while closing client", e);
            }
        }
    }
}

// Triggered from business logic.
public synchronized void startReceiving() {
    for (ServiceBusReceiveClient client : clients) {
        client.start();
    }
}

Expected behavior No error message is logged

Screenshots

  • no screenshots

Setup:

  • OS: Win11
  • IDE: IntelliJ
  • Library/Libraries: "azure-messaging-servicebus" Version 7.16.0
  • Java version: 17
  • Frameworks: Spring Boot 3.2.5

Additional context

frederikHartung-extern avatar Apr 25 '24 09:04 frederikHartung-extern

@anuchandy @conniey @lmolkova

github-actions[bot] avatar Apr 25 '24 09:04 github-actions[bot]

Thank you for your feedback. Tagging and routing to the team member best able to assist.

github-actions[bot] avatar Apr 25 '24 09:04 github-actions[bot]

@frederikHartung-extern thank you for reporting this!

Could you please confirm that the issue here is the noise introduced by logs or is there something else that does not work?

lmolkova avatar Apr 25 '24 15:04 lmolkova

I confirm, that just the ERROR Logs a problem for me. I didnt notice any other errors or that something is not working.

frederikHartung-extern avatar Apr 25 '24 16:04 frederikHartung-extern

Level for this logging has been changed to warn in this pr, so no longer reported as error

anuchandy avatar May 06 '24 18:05 anuchandy

Thanks!

frederikHartung-extern avatar May 08 '24 10:05 frederikHartung-extern