Cannot subscribe. Processor is already terminated
Describe the bug When stopping/closing the first ServiceBusSenderClient/ServiceBusProcessorClient in our spring boot app while Application shutdown in k8s, this two errors are logged:
logger: com.azure.core.amqp.implementation.AmqpChannelProcessor message: {"az.sdk.message":"Cannot subscribe. Processor is already terminated.","exception":"Cannot subscribe. Processor is already terminated.","connectionId":"MF_bdec02_1713997242025","entityPath":"$cbs"}
logger: reactor.core.publisher.Operators message: Operator called default onErrorDropped
The error is logged everytime, when a deployment is shutting down and the first ServiceBusProcessorClient is closed. When closing the second client, no error is logged/happening. When the client was not running, no error is logged.
The error for the ServiceBusSenderClient.close() is happening inconsistly for our cronjobs.
Exception or Stack Trace
for ServiceBusSenderClient.close(): reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Cannot subscribe. Processor is already terminated. Caused by: java.lang.IllegalStateException: Cannot subscribe. Processor is already terminated. at com.azure.core.amqp.implementation.AmqpChannelProcessor.subscribe(AmqpChannelProcessor.java:270) at reactor.core.publisher.Mono.subscribe(Mono.java:4568) at reactor.core.publisher.MonoWhen$WhenCoordinator.request(MonoWhen.java:229) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onSubscribe(MonoIgnoreThen.java:135) at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:61) at reactor.core.publisher.MonoWhen.subscribe(MonoWhen.java:101) at reactor.core.publisher.Mono.subscribe(Mono.java:4568) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265) at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) at reactor.core.publisher.Mono.subscribe(Mono.java:4568) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265) at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) at reactor.core.publisher.Mono.subscribe(Mono.java:4568) at reactor.core.publisher.Mono.subscribeWith(Mono.java:4634) at reactor.core.publisher.Mono.subscribe(Mono.java:4395) at com.azure.core.amqp.implementation.ReactorConnectionCache.dispose(ReactorConnectionCache.java:191) at com.azure.messaging.servicebus.ServiceBusClientBuilder$V2StackSupport.onClientClose(ServiceBusClientBuilder.java:1280) at com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.close(ServiceBusSenderAsyncClient.java:762) at com.azure.messaging.servicebus.ServiceBusSenderClient.close(ServiceBusSenderClient.java:474) at our code....
for ServiceBusProcessorClient.close(): reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Cannot subscribe. Processor is already terminated. Caused by: java.lang.IllegalStateException: Cannot subscribe. Processor is already terminated. at com.azure.core.amqp.implementation.AmqpChannelProcessor.subscribe(AmqpChannelProcessor.java:270) at reactor.core.publisher.Mono.subscribe(Mono.java:4568) at reactor.core.publisher.MonoWhen$WhenCoordinator.request(MonoWhen.java:229) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onSubscribe(MonoIgnoreThen.java:135) at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:61) at reactor.core.publisher.MonoWhen.subscribe(MonoWhen.java:101) at reactor.core.publisher.Mono.subscribe(Mono.java:4568) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265) at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) at reactor.core.publisher.Mono.subscribe(Mono.java:4568) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265) at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) at reactor.core.publisher.Mono.subscribe(Mono.java:4568) at reactor.core.publisher.Mono.subscribeWith(Mono.java:4634) at reactor.core.publisher.Mono.subscribe(Mono.java:4395) at com.azure.core.amqp.implementation.ReactorConnectionCache.dispose(ReactorConnectionCache.java:191) at com.azure.messaging.servicebus.ServiceBusClientBuilder$V2StackSupport.onClientClose(ServiceBusClientBuilder.java:1280) at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.close(ServiceBusReceiverAsyncClient.java:1509) at com.azure.messaging.servicebus.ServiceBusProcessor$RollingMessagePump.lambda$beginIntern$2(ServiceBusProcessor.java:237) at reactor.core.publisher.MonoUsing$MonoUsingSubscriber.cleanup(MonoUsing.java:191) at reactor.core.publisher.MonoUsing$MonoUsingSubscriber.cancel(MonoUsing.java:185) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2425) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2393) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2205) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2425) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2393) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:2205) at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.cancel(FluxRetryWhen.java:167) at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157) at reactor.core.publisher.LambdaMonoSubscriber.dispose(LambdaMonoSubscriber.java:217) at reactor.core.Disposables$ListCompositeDisposable.dispose(Disposables.java:268) at reactor.core.Disposables$ListCompositeDisposable.dispose(Disposables.java:161) at com.azure.messaging.servicebus.ServiceBusProcessor$RollingMessagePump.dispose(ServiceBusProcessor.java:268) at com.azure.messaging.servicebus.ServiceBusProcessor.close(ServiceBusProcessor.java:119) at com.azure.messaging.servicebus.ServiceBusProcessorClient.close(ServiceBusProcessorClient.java:353) at our code...
To Reproduce Have a running spring boot app with a running ServiceBusProcessorClient. Let the container be replaced with a new deployment. In the logs you should find the error.
Code Snippet
// Creating the client: public ServiceBusProcessorClient createServiceBusProcessorClient(String connectionString, String queueName) { return new ServiceBusClientBuilder() .transportType(AmqpTransportType.AMQP_WEB_SOCKETS) .connectionString(connectionString) .processor() .queueName(queueName) .prefetchCount(0) .maxAutoLockRenewDuration(Duration.of(60L, ChronoUnit.MINUTES)) .maxConcurrentCalls(1) .processMessage(this::processMessage) .processError(this::processError) .disableAutoComplete() .buildProcessorClient(); }
// Start/Stop/Close: @Override public void start() { try { this.client = createServiceBusProcessorClient(connectionString, queueName); ((ServiceBusProcessorClient) client).start(); } catch (Exception e) { LOGGER.error("Error while starting QueueProcessor {}", loggingContext, e); } }
@Override public void stop() { if (client != null) { ((ServiceBusProcessorClient) client).stop(); } }
@Override public void close() { if (client != null) { ((ServiceBusProcessorClient) client).close(); } }
// Component, which manages the clients: @Component public class DataReceiver implements DisposableBean { private final List<MyServiceBusReceiveClient> clients = new ArrayList<>();
// Clients is filled with multiple clients in the constructor and not shown here.
@Override
public void destroy() {
for (ServiceBusReceiveClient client : clients) {
if (client != null) {
try {
client.stop();
client.close();
} catch (Exception e) {
LOGGER.error("Error while closing client", e);
}
}
}
}
// Triggered from business logic.
public synchronized void startReceiving() {
for (ServiceBusReceiveClient client : clients) {
client.start();
}
}
Expected behavior No error message is logged
Screenshots
- no screenshots
Setup:
- OS: Win11
- IDE: IntelliJ
- Library/Libraries: "azure-messaging-servicebus" Version 7.16.0
- Java version: 17
- Frameworks: Spring Boot 3.2.5
Additional context
@anuchandy @conniey @lmolkova
Thank you for your feedback. Tagging and routing to the team member best able to assist.
@frederikHartung-extern thank you for reporting this!
Could you please confirm that the issue here is the noise introduced by logs or is there something else that does not work?
I confirm, that just the ERROR Logs a problem for me. I didnt notice any other errors or that something is not working.
Level for this logging has been changed to warn in this pr, so no longer reported as error
Thanks!