[BUG] Processing of Event Hub partition stops for apprx 30 minutes, then resumes
Describe the bug
We are using Event Hubs with Standard SKU. We have 32 partitions. Several times per day, processing of one partition stops for aprx. 30 minutes. Then it resumes to process, without any exception logged. We have 16 replicas of the service running. Only one partition is affected, other partitions in the same service continue to process messages. We are using the EventProcessorClient. Based on the processPartitionInitialization and processPartitionClose callbacks, we see that ownership of the partition does not change. However, when processing resumes, processPartitionInitialization is called again (without a prior call to processPartitionClose). We have been able to create stack traces when the problem occurred. There is no running processEventBatch handler, the affected Partition Pump seems to idle and wait for messages.
Exception or Stack Trace There is no stack trace. Processing of a single partition stops.
To Reproduce (It happens consistently on 2 Event Hub namespaces and 2 AKS clusters)
- Deploy the application to the AKS cluster
- Wait for several hours
- Observe the lags and throughput metrics (see screenshots below)
Code Snippet
Here is our initialization of the EventProcessorClient:
final EventProcessorClientBuilder processorClientBuilder = new EventProcessorClientBuilder()
.connectionString(connectionString)
.consumerGroup(consumerGroupName)
.processEventBatch(PARTITION_PROCESSOR, 1000, 50)
.processError(ERROR_HANDLER)
.processPartitionInitialization(PARTITION_INIT_PROCESSOR)
.processPartitionClose(PARTITION_CLOSE_PROCESSOR)
.partitionOwnershipExpirationInterval(60)
.checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));
Expected behavior The processing of messages should not stop.
Screenshots Here you can see a Grafana dashboard that monitors the lag of the partitions. The red periods indicate occurrences of the problem. The time range of the screenshot is 24 hours
Setup
- OS: Linux / AKS
- IDE: IntelliJ
- Library/Libraries: Version of 1.2.34 of the azure-sdk-bom
- Java version: 21
- App Server/Environment: Tomcat
- Frameworks: Spring Boot
From similar cases, there is a potential for random thread bottleneck, can we rule out that?
-
App should ensure that any event processing and related business logic are completed before returning control from the hander callback. i.e., do not allow the thread that invoked the handler to execute tasks in background after control has been returned. The handlers are designed to be synchronous, for example - patterns such as obtaining a
CompletableFuture,Publisherand returning from handler without waiting for them to complete is an antipattern, and can hinder internals progress of the library eventually. -
The OpenJDK team at Microsoft recommends a minimum of 2 cores per host (container, replica) for containerized Java applications, plus for Java and .NET Event Hubs processors, the recommendation is ~1.5 partitions for each core, so together this translates to roughly 3 cores per host (container, replica).
As an experiment, can you try increasing replica core allocation and monitor if it reduces or eliminates the halt-then-resume behavior - remember to adjust node pool cores accordingly and also to account for additional workloads from other application components, such as Netty thread pools, database client pools, etc. We would also suggest overriding the reactor's global thread pool size given certain Spring components and library shares this pool, and disabling any virtual threading while experimenting.
For the reference, In our stress tests, we run processors continuously. However, those handlers only perform checkpointing with a small delay and do not have any real business like logic (e.g., no external service calls), so use of cpu is scoped.
Thanks for your reply @anuchandy
Regarding 1.: We are using CompleteableFutures in our handler callback. We are, however, just using them for parallelizing work and join them before handing control back to the processor client.
Regarding 2.: We are running in AKS. We are not limiting CPU per container. We have 22 nodes in our cluster with 8 vCPUs each. Of course we have other workloads running there as well the 16 replicas of the affected service. Would you recommend that we introduce CPU limits for all services that we can guarantee the each of the affected service replicas gets 3 vCPUs to itself? We would have to greatly scale up our cluster for this. Could you explain how having too few CPU cycles available could lead to the halt-then-resume scenario? Are there any internal housekeeping processes within the SDK that could starve?
We will try to override the DEFAULT_BOUNDED_ELASTIC_SIZE. Any recommendations for the value?
We are not using virtual threading.
We have a pretty low (probably uncommon) value for max wait time (50ms) because we have to optimize for latency. Could that be a problem?
The library internally uses Reactor to abstract all concurrency, and do not explicitly manage task scheduling. Reactor thread pools have no fairness guarantees, and underneath certain concurrency operators shares these pool. The design of the Reactor scheduler (pool) is to front end each thread in the pool with a task queue, and there is no fairness on which task queue gets picked when enqueuing task once the pool is full, means we may see some tasks moving along in what looks like a natural pattern while others can appear to stall completely. It's not always clear, because the CPU doesn't get pegged unless part of the application's processing is CPU-intensive.
Based on my reading, it is advised not to set "limit" reference. Instead, use "CPU requests" and ensure "node-level availability".
Optimal value for DEFAULT_BOUNDED_ELASTIC_SIZE is case by case, but I would say set it 100 in a 3-core setup (default would be 30 (10 * core_count), so we're bumping it ~3x).
We also recommend to have wait time in seconds range (e.g. 3-5 seconds). A value that is too low may place excessive pressure on the "shared" Reactor timer (parallel) threads (the number of parallel threads are equal to the number of cores), for example retries, (I think including by the Spring components), partition load balancing all shares this Reactor global parallel threads.
Is there some way you can experiment this in a more scoped env without other noises / or competing services? - Test/UAT env.
I was experimenting before your reply, @anuchandy , and tried it with the following:
Kubernetes limits for the pod is now (CPU requests were 250m previously, now 2000m):
Limits:
memory: 800Mi
Requests:
cpu: 2
memory: 600Mi
Also we set DEFAULT_BOUNDED_ELASTIC_SIZE via:
JAVA_TOOLS_OPTIONS=-XX:MaxRAMPercentage=60.0 -Dreactor.schedulers.defaultBoundedElasticSize=200
I set it to 200, which should be even more generous than your 100, right?
Unfortunately, the issue still occurred. I will try it with a setting of 100 and with a CPU request of 3000m.
In general, it does not seem to conincide with spikes of CPU usage across the cluster
(yellow is the lag going up, green is the CPU container_cpu_usage_seconds_total metric)
We also recommend to have wait time in seconds range (e.g. 3-5 seconds). A value that is too low may place excessive pressure on the "shared" Reactor timer (parallel) threads (the number of parallel threads are equal to the number of cores), for example retries, (I think including by the Spring components), partition load balancing all shares this Reactor global parallel threads.
We can't, unfortunately, due to strict latency requirements. We were successfully using a MAX_WAIT_TIME of 50ms with the Node.js Event Hubs SDK and did not have any problems. But thanks for the hint, we will try to increase the setting and see it has an effect.
The issue appeared again with -Dreactor.schedulers.defaultBoundedElasticSize=100 and
Limits:
memory: 800Mi
Requests:
cpu: 3
memory: 600Mi
Is there some way you can experiment this in a more scoped env without other noises / or competing services? - Test/UAT env.
We are considering how we could do this
We have implemented a workaround: Each partition processor callback stores the timestamp right before handing back control to the SDK in a watchdog service. When the processor callback starts processing, it clears that timestamp. The watchdog service runs every couple of seconds and checks if any stored timestamps (associated with Event Processor client instances) are longer ago than a grace period. If this is the case, it means that the processing logic ended a while back and did not start up again. We then restart the Event Processor client. This seems to detect the situation and a restart of the client seems to break the temporary deadlock. It ain't pretty of course and we would love to have a more correct solution.
Thanks for sharing the update. I've been thinking about this. Can you try the following temporary configurations as an experiment and see it make any difference for halt-and-resume behavior in your setup:
- Keep the same host settings we discussed earlier - 3 cores, pool-size 200.
- Log the pool size in at application start time to make sure it took effect. I've seen some weird cases where -D not taking effect in some setup.
- Reduce the batch size to exactly 250 events, and max wait time to 2 seconds.
Thanks for the ideas, @anuchandy . I will try that out on our QA environment.
We have increased max wait time from 50ms to 150ms (for one Event Hub) and 250ms (for another Event Hub). We have not encountered the error since then (our watchdog logic described above was not triggered any more).
I think that for small values of max wait time the issue will likely appear, but we have adjusted our setup in a way that we can fulfill our latency requirements with the longer max wait time values.
Thank you for you assistance, @anuchandy. I will close this ticket, we would re-open should we encounter the problem again
Thanks, Stefan, for sharing the update. We’ll look into updating the troubleshooting guidelines with this information.