[BUG] EventProcessorClient sometimes stops reading in low traffic environments
Describe the bug
In our low traffic environments (~40 messages per second on our eventhub), we're facing issues with the EventProcessorClient. It runs stable for a while, but after about a day it just stops reading. it does not crash, nor does it throw exceptions, it just stops.
Restarting the application fixes the issue until it happens again. Sometimes it's after 5 minutes, sometimes after a day, sometimes weeks. In the last 7 days, we have observed it 3 times in our prod environment.
Exception or Stack Trace
I've replaced the true service name with my-service, just fyi
[INFO] 2024-08-15T11:08:34,926: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionRemoteClose","connectionId":"MF_ed5df0_1723716621282","sessionName":"my-service-prod-jpe-eh-vsms"}
[INFO] 2024-08-15T11:08:34,926: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionRemoteClose closing a local session.","connectionId":"MF_ed5df0_1723716621282","sessionName":"my-service-prod-jpe-eh-vsms"}
[DEBUG] 2024-08-15T11:08:34,926: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorSession - {"az.sdk.message":"Got endpoint state.","connectionId":"MF_ed5df0_1723716621282","sessionName":"my-service-prod-jpe-eh-vsms","state":"CLOSED"}
[INFO] 2024-08-15T11:08:34,926: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SendLinkHandler - {"az.sdk.message":"onLinkRemoteClose","connectionId":"MF_ed5df0_1723716621282","errorCondition":"amqp:connection:forced","errorDescription":"The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34","linkName":"cbs:sender","entityPath":"$cbs"}
[INFO] 2024-08-15T11:08:34,926: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SendLinkHandler - {"az.sdk.message":"Local link state is not closed.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs:sender","entityPath":"$cbs","state":"ACTIVE"}
[WARN] 2024-08-15T11:08:34,927: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Error in SendLinkHandler. Disposing unconfirmed sends.","exception":"The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34, errorContext[NAMESPACE: my-service-prod-jpe-ehn-vsms.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 98]","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
com.azure.core.amqp.exception.AmqpException: The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34, errorContext[NAMESPACE: my-service-prod-jpe-ehn-vsms.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 98]
at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:90) ~[azure-core-amqp-2.9.7.jar!/:2.9.7]
at com.azure.core.amqp.implementation.handler.LinkHandler.handleRemoteLinkClosed(LinkHandler.java:120) ~[azure-core-amqp-2.9.7.jar!/:2.9.7]
at com.azure.core.amqp.implementation.handler.LinkHandler.onLinkRemoteClose(LinkHandler.java:63) ~[azure-core-amqp-2.9.7.jar!/:2.9.7]
at com.azure.core.amqp.implementation.handler.SendLinkHandler.onLinkRemoteClose(SendLinkHandler.java:37) ~[azure-core-amqp-2.9.7.jar!/:2.9.7]
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176) ~[proton-j-0.34.1.jar!/:?]
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) ~[proton-j-0.34.1.jar!/:?]
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) ~[proton-j-0.34.1.jar!/:?]
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) ~[proton-j-0.34.1.jar!/:?]
at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91) ~[azure-core-amqp-2.9.7.jar!/:2.9.7]
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.ContextPropagationOperator$RunnableWrapper.run(ContextPropagationOperator.java:373) ~[agent.jar:?]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.6.7.jar!/:3.6.7]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.6.7.jar!/:3.6.7]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
[DEBUG] 2024-08-15T11:08:34,928: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Closing request/response channel.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[INFO] 2024-08-15T11:08:34,928: reactor-executor-31 - com.azure.core.amqp.implementation.AmqpChannelProcessor - {"az.sdk.message":"Transient error occurred. Retrying.","exception":"The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34, errorContext[NAMESPACE: my-service-prod-jpe-ehn-vsms.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 98]","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs","tryCount":0,"intervalMs":1800}
com.azure.core.amqp.exception.AmqpException: The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34, errorContext[NAMESPACE: my-service-prod-jpe-ehn-vsms.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 98]
at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:90) ~[azure-core-amqp-2.9.7.jar!/:2.9.7]
at com.azure.core.amqp.implementation.handler.LinkHandler.handleRemoteLinkClosed(LinkHandler.java:120) ~[azure-core-amqp-2.9.7.jar!/:2.9.7]
at com.azure.core.amqp.implementation.handler.LinkHandler.onLinkRemoteClose(LinkHandler.java:63) ~[azure-core-amqp-2.9.7.jar!/:2.9.7]
at com.azure.core.amqp.implementation.handler.SendLinkHandler.onLinkRemoteClose(SendLinkHandler.java:37) ~[azure-core-amqp-2.9.7.jar!/:2.9.7]
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176) ~[proton-j-0.34.1.jar!/:?]
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) ~[proton-j-0.34.1.jar!/:?]
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) ~[proton-j-0.34.1.jar!/:?]
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) ~[proton-j-0.34.1.jar!/:?]
at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91) ~[azure-core-amqp-2.9.7.jar!/:2.9.7]
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.ContextPropagationOperator$RunnableWrapper.run(ContextPropagationOperator.java:373) ~[agent.jar:?]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.6.7.jar!/:3.6.7]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.6.7.jar!/:3.6.7]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
[DEBUG] 2024-08-15T11:08:34,928: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Terminating 0 unconfirmed sends (reason: The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34, errorContext[NAMESPACE: my-service-prod-jpe-ehn-vsms.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 98]).","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[DEBUG] 2024-08-15T11:08:34,928: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"completed the termination of 0 unconfirmed sends (reason: The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34, errorContext[NAMESPACE: my-service-prod-jpe-ehn-vsms.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 98]).","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[DEBUG] 2024-08-15T11:08:34,928: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Channel already closed.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[DEBUG] 2024-08-15T11:08:34,928: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"SendLinkHandler disposed. Remaining: 1","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[DEBUG] 2024-08-15T11:08:34,928: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionLocalClose","connectionId":"MF_ed5df0_1723716621282","sessionName":"my-service-prod-jpe-eh-vsms"}
[DEBUG] 2024-08-15T11:08:34,928: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SendLinkHandler - {"az.sdk.message":"onLinkLocalClose","connectionId":"MF_ed5df0_1723716621282","errorCondition":"amqp:connection:forced","errorDescription":"The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34","linkName":"cbs:sender","entityPath":"$cbs"}
[DEBUG] 2024-08-15T11:08:34,928: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Closing send link and receive link.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2 - {"az.sdk.message":"onLinkLocalClose","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs:receiver","entityPath":"$cbs"}
[INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2 - {"az.sdk.message":"onLinkRemoteClose","connectionId":"MF_ed5df0_1723716621282","errorCondition":"amqp:connection:forced","errorDescription":"The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34","linkName":"cbs:receiver","entityPath":"$cbs"}
[DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"ReceiveLinkHandler disposed. Remaining: 0","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Terminating 0 unconfirmed sends (reason: The RequestResponseChannel didn't receive the acknowledgment for the send due receive link termination.).","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"completed the termination of 0 unconfirmed sends (reason: The RequestResponseChannel didn't receive the acknowledgment for the send due receive link termination.).","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Could not emit complete signal.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs","signalType":"onComplete","emitResult":"FAIL_TERMINATED"}
[INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionRemoteClose","connectionId":"MF_ed5df0_1723716621282","sessionName":"cbs-session"}
[INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionRemoteClose closing a local session.","connectionId":"MF_ed5df0_1723716621282","sessionName":"cbs-session"}
[DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorSession - {"az.sdk.message":"Got endpoint state.","connectionId":"MF_ed5df0_1723716621282","sessionName":"cbs-session","state":"CLOSED"}
[INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ConnectionHandler - {"az.sdk.message":"onConnectionRemoteClose","connectionId":"MF_ed5df0_1723716621282","errorCondition":"amqp:connection:forced","errorDescription":"The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34","hostName":"my-service-prod-jpe-ehn-vsms.servicebus.windows.net"}
[INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ConnectionHandler - {"az.sdk.message":"onTransportClosed","connectionId":"MF_ed5df0_1723716621282","hostName":"my-service-prod-jpe-ehn-vsms.servicebus.windows.net"}
[INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.GlobalIOHandler - {"az.sdk.message":"onTransportClosed","connectionId":"MF_ed5df0_1723716621282","hostName":"my-service-prod-jpe-ehn-vsms.servicebus.windows.net"}
[DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionLocalClose","connectionId":"MF_ed5df0_1723716621282","sessionName":"cbs-session"}
[INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ConnectionHandler - {"az.sdk.message":"onConnectionLocalClose","connectionId":"MF_ed5df0_1723716621282","hostName":"my-service-prod-jpe-ehn-vsms.servicebus.windows.net"}
[INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ConnectionHandler - {"az.sdk.message":"onConnectionUnbound","connectionId":"MF_ed5df0_1723716621282","hostName":"my-service-prod-jpe-ehn-vsms.servicebus.windows.net","state":"CLOSED","remoteState":"CLOSED"}
[DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"getConnectionState","connectionId":"MF_ed5df0_1723716621282","state":"CLOSED"}
[DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"Disposing of active sessions due to connection close.","connectionId":"MF_ed5df0_1723716621282"}
[INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"Disposing of ReactorConnection.","connectionId":"MF_ed5df0_1723716621282","isTransient":false,"isInitiatedByClient":false,"shutdownMessage":"Connection handler closed."}
[DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorSession - {"az.sdk.message":"Setting error condition and disposing session. Shutdown signal received (Connection handler closed.)","connectionId":"MF_ed5df0_1723716621282","sessionName":"my-service-prod-jpe-eh-vsms"}
[DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorSession - {"az.sdk.message":"Setting error condition and disposing session. Shutdown signal received (Connection handler closed.)","connectionId":"MF_ed5df0_1723716621282","sessionName":"cbs-session"}
[DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.AmqpChannelProcessor - {"az.sdk.message":"Added subscriber.","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs","subscriberId":"un_b261dd_1723720114929"}
[DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"Closed management nodes.","connectionId":"MF_ed5df0_1723716621282","signalType":"onComplete"}
[INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor - {"az.sdk.message":"Channel is closed. Requesting upstream.","entityPath":"my-service-prod-jpe-eh-vsms"}
[DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"Connection was already closed. Not disposing again.","connectionId":"MF_ed5df0_1723716621282"}
[INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor - {"az.sdk.message":"Connection not requested, yet. Requesting one.","entityPath":"my-service-prod-jpe-eh-vsms"}
[INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.messaging.eventhubs.EventHubClientBuilder - {"az.sdk.message":"Emitting a single connection.","connectionId":"MF_6c869a_1723720114929"}
[DEBUG] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"getConnectionState","connectionId":"MF_6c869a_1723720114929","state":"UNINITIALIZED"}
[INFO] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor - {"az.sdk.message":"Setting next AMQP channel.","entityPath":"my-service-prod-jpe-eh-vsms"}
[INFO] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SendLinkHandler - {"az.sdk.message":"onLinkFinal","connectionId":"MF_ed5df0_1723716621282","linkName":"my-service-prod-jpe-eh-vsms","entityPath":"my-service-prod-jpe-eh-vsms"}
[INFO] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionFinal.","connectionId":"MF_ed5df0_1723716621282","sessionName":"my-service-prod-jpe-eh-vsms"}
[DEBUG] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorSession - {"az.sdk.message":"Disposing of active send and receive links due to session close.","connectionId":"MF_ed5df0_1723716621282","sessionName":"my-service-prod-jpe-eh-vsms"}
[INFO] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SendLinkHandler - {"az.sdk.message":"onLinkFinal","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs:sender","entityPath":"$cbs"}
[DEBUG] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2 - {"az.sdk.message":"Could not emit messages.close when closing handler.","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs","linkName":"cbs","emitResult":"FAIL_TERMINATED"}
[INFO] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2 - {"az.sdk.message":"onLinkFinal","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs:receiver","entityPath":"$cbs"}
[INFO] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionFinal.","connectionId":"MF_ed5df0_1723716621282","sessionName":"cbs-session"}
[DEBUG] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorSession - {"az.sdk.message":"Disposing of active send and receive links due to session close.","connectionId":"MF_ed5df0_1723716621282","sessionName":"cbs-session"}
[INFO] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ConnectionHandler - {"az.sdk.message":"onConnectionFinal","connectionId":"MF_ed5df0_1723716621282","hostName":"my-service-prod-jpe-ehn-vsms.servicebus.windows.net"}
[INFO] 2024-08-15T11:08:36,728: parallel-1 - com.azure.core.amqp.implementation.AmqpChannelProcessor - {"az.sdk.message":"Requesting from upstream.","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs","tryCount":0}
[INFO] 2024-08-15T11:08:36,728: parallel-1 - com.azure.core.amqp.implementation.AmqpChannelProcessor - {"az.sdk.message":"Connection not requested, yet. Requesting one.","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs"}
[DEBUG] 2024-08-15T11:08:36,729: parallel-1 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Updating endpoint states.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs","sendState":null,"receiveState":"UNINITIALIZED"}
[DEBUG] 2024-08-15T11:08:36,729: parallel-1 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Updating endpoint states.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs","sendState":"UNINITIALIZED","receiveState":"UNINITIALIZED"}
[DEBUG] 2024-08-15T11:08:36,729: parallel-1 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Shutdown signal received.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[DEBUG] 2024-08-15T11:08:36,729: parallel-1 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Closing request/response channel.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[INFO] 2024-08-15T11:08:36,729: parallel-1 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"Emitting new response channel.","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs","linkName":"cbs"}
[INFO] 2024-08-15T11:08:36,729: parallel-1 - com.azure.core.amqp.implementation.AmqpChannelProcessor - {"az.sdk.message":"Setting next AMQP channel.","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs"}
[DEBUG] 2024-08-15T11:08:36,729: parallel-1 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Channel already closed.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Closing send link and receive link.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[INFO] 2024-08-15T11:08:36,729: parallel-1 - com.azure.core.amqp.implementation.AmqpChannelProcessor - {"az.sdk.message":"Next AMQP channel received.","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs","subscriberId":"un_b261dd_1723720114929"}
[INFO] 2024-08-15T11:08:36,729: parallel-1 - com.azure.core.amqp.implementation.AmqpChannelProcessor - {"az.sdk.message":"Upstream connection publisher was completed. Terminating processor.","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs"}
[DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SendLinkHandler - {"az.sdk.message":"onLinkLocalOpen","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs:sender","entityPath":"$cbs","localTarget":"Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}"}
[DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2 - {"az.sdk.message":"onLinkLocalOpen","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs","linkName":"cbs:receiver","localSource":"Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}"}
[DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SendLinkHandler - {"az.sdk.message":"onLinkLocalClose","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs:sender","entityPath":"$cbs"}
[INFO] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SendLinkHandler - {"az.sdk.message":"Sender link was never active. Closing endpoint states.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs","entityPath":"$cbs"}
[DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Updating endpoint states.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs","sendState":"CLOSED","receiveState":"UNINITIALIZED"}
[DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Channel already closed.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"SendLinkHandler disposed. Remaining: 1","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2 - {"az.sdk.message":"onLinkLocalClose","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs:receiver","entityPath":"$cbs"}
[INFO] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2 - {"az.sdk.message":"Receiver link was never active. Closing endpoint states","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs","linkName":"cbs"}
[DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Updating endpoint states.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs","sendState":"CLOSED","receiveState":"CLOSED"}
[DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Channel already closed.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"ReceiveLinkHandler disposed. Remaining: 0","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Terminating 0 unconfirmed sends (reason: The RequestResponseChannel didn't receive the acknowledgment for the send due receive link termination.).","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"completed the termination of 0 unconfirmed sends (reason: The RequestResponseChannel didn't receive the acknowledgment for the send due receive link termination.).","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"}
[INFO] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.AmqpChannelProcessor - {"az.sdk.message":"Channel is disposed.","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs"}
[DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"Scheduling closeConnection work.","connectionId":"MF_ed5df0_1723716621282"}
[DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"Closed reactor dispatcher.","connectionId":"MF_ed5df0_1723716621282","signalType":"onComplete"}
[DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"Closed CBS node.","connectionId":"MF_ed5df0_1723716621282","signalType":"onComplete"}
[INFO] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"Closing executor.","connectionId":"MF_ed5df0_1723716621282"}
To Reproduce Steps to reproduce the behavior:
- unfortunately we are unable to reproduce it directly, but it happened 3 times in the same environment in the last 7 days. Our guess is that it happens if there's no messages coming in on a partition for more than 5 minutes as the exception suggests.
Code Snippet
The EventProcessorClient setup looks like this:
public void startTelemetryProcessor() {
LOGGER.debug("starting new EventProcessorClient");
String fullyQualifiedNamespace = config.getIncomingEventHubNamespace() + config.getEhHostSuffix();
String consumerGroupName = config.getConsumerGroupName();
TokenCredential tokenCredential = azureCredentials.getTokenCredential();
BlobContainerAsyncClient blobContainerClient = createBlobClient(tokenCredential, config);
telemetryProcessorHost = new EventProcessorClientBuilder()
.fullyQualifiedNamespace(fullyQualifiedNamespace)
.eventHubName(config.getIncomingEventHub())
.credential(tokenCredential)
.consumerGroup(consumerGroupName)
.initialPartitionEventPosition((partitionId) -> EventPosition.earliest())
.checkpointStore(new BlobCheckpointStore(blobContainerClient))
.loadBalancingUpdateInterval(Duration.ofSeconds(5))
.partitionOwnershipExpirationInterval(Duration.ofSeconds(20))
.loadBalancingStrategy(LoadBalancingStrategy.GREEDY)
.processEventBatch(createEventConsumer(), 256, Duration.ofSeconds(2))
.processError(createErrorConsumer())
.processPartitionInitialization(createInitializationConsumer())
.processPartitionClose(createCloseConsumer())
.buildEventProcessorClient();
try {
telemetryProcessorHost.start();
LOGGER.info(
"Started EventProcessorClient for telemetry with identifier {}",
telemetryProcessorHost.getIdentifier());
} catch (Exception ex) {
LOGGER.error("Failed at startup telemetry EventProcessorClient", ex);
}
}
private BlobContainerAsyncClient createBlobClient(TokenCredential tokenCredential, Configuration config) {
return new BlobContainerClientBuilder()
.credential(tokenCredential)
.endpoint(config.getCheckpointStorageAccountEndpoint())
.containerName(config.getTelemetryContainerName())
.buildAsyncClient();
}
private Consumer<ErrorContext> createErrorConsumer() {
return context -> {
PartitionContext partitionContext = context.getPartitionContext();
LOGGER.error(
"Error received for Partition {} on {}",
partitionContext.getPartitionId(),
partitionContext.getEventHubName(),
context.getThrowable());
};
}
private Consumer<CloseContext> createCloseConsumer() {
return closeContext -> {
metrics.decrementPartitionCount();
PartitionContext partitionContext = closeContext.getPartitionContext();
LOGGER.info(
"Closed partition {} on {}. Reason: {}",
partitionContext.getPartitionId(),
partitionContext.getEventHubName(),
closeContext.getCloseReason());
};
}
private Consumer<InitializationContext> createInitializationConsumer() {
return initializationContext -> {
metrics.incrementPartitionCount();
PartitionContext partitionContext = initializationContext.getPartitionContext();
LOGGER.info(
"Opened partition {} on {}", partitionContext.getPartitionId(), partitionContext.getEventHubName());
};
}
private Consumer<EventBatchContext> createEventConsumer() {
return eventDataBatch -> {
if (!eventDataBatch.getEvents().isEmpty()) {
String transactionId = UUID.randomUUID().toString();
TelemetryProcessResult result =
eventService.processTelemetry(transactionId, eventDataBatch.getEvents());
eventService.publishMessages(transactionId, result);
eventDataBatch.updateCheckpoint();
LOGGER.debug(
"Checkpointed Partition {}",
eventDataBatch.getPartitionContext().getPartitionId());
}
};
}
Expected behavior
EventProcessorClient keeps reading messages.
Screenshots
Azure Portal Metrics of the eventhub in the time where it happens (reading stops)
Grafana Metrics of our service:
we have 2 pods running and they both stop reading about 1 minutes apart. Note that the line continues, meaning the metrics keep getting published as the pods are still healthy.
Here it happened to both pods 1 hour apart.
Here both pods stopped reading at the same time.
Setup:
- OS: Linux
- IDE: issue was only observed in AKS cluster
- Library/Libraries:
azure-messaging-eventhubs:5.18.6azure-messaging-eventhubs-checkpointstore-blob:1.19.6azure-identity:1.13.2
- Java version:
21.0.1 - App Server/Environment: Tomcat (as included in Spring Boot 3.3.1)
- Frameworks: Spring Boot 3.3.1
Additional context Here is the extended log file of the above log snippet where you can see the pump threads processing messages before: sdk-issue-prod-jp.zip
The same issue also occurs in a similar service that uses these dependency versions:
azure-messaging-eventhubs:5.18.2azure-messaging-eventhubs-checkpointstore-blob:1.19.2azure-identity:1.11.4
Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report
- [x] Bug Description Added
- [x] Repro Steps Added
- [x] Setup information Added
Thanks for reporting this.
We've been working on a newer version of the internals for Event Hubs (it's been GA'd for Service Bus for several months), and this behaviour doesn't appear in tests where we have idler clients. We're planning to release a beta where people can opt into using the v2 stack via environment variable (can disable it to switch back to original v1 stack). Would you be interested in trying out the beta?
Hi connie, we would definitely be interested in that. However it won't help us with this issue as we cannot use a beta library in a production environment.
Closing this, user confirmed offline that the issue is resolved with 5.19.0 with V2 Stack opt-in