KAFKA-19968: Fix tiered storage quota enforcement issues
###Problem:
-
Race condition and quota leaks: Multiple threads could check quota before any recorded usage, allowing all to bypass limits simultaneously. Additionally, in multi-partition fetches, quota was reserved per-partition but could leak if some partitions failed or were throttled, leading to quota exhaustion over time.
-
Startup race condition: RemoteLogManager initialized with default quotas (Long.MAX_VALUE = unlimited) and relied on dynamic config updates to apply correct values, creating a window where operations could exceed configured quotas.
###Solution:
-
Atomic quota reservation
- Added
RLMQuotaManager.recordAndGetThrottleTimeMs()to atomically record usage and check quota in a single synchronized operation - Added quotaReservedBytes field to RemoteStorageFetchInfo to track per-partition reservations
- Modified ReplicaManager to call
recordAndCheckFetchQuota()BEFORE dispatching remote fetch, ensuring quota is reserved atomically based on adjustedMaxBytes - If throttled, immediately release the reservation since fetch won't execute
- RemoteLogReader adjusts quota using delta (actual - reserved) after fetch completes
- On error, releases the full reservation to prevent leaks
- Added
-
Eager startup quota initialization
- Ensures quotas are correct before broker starts serving requests
- Added
BrokerServer.applyRemoteLogQuotas()to eagerly apply quota configs immediately after RemoteLogManager creation
@abhijeetk88 @kamalcph @satishd , I think the issue is valid. WDYT?
A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.
Thanks for the PR! Went over first pass, have below questions:
Startup race condition: RemoteLogManager initialized with default quotas (Long.MAX_VALUE = unlimited) and relied on dynamic config updates to apply correct values, creating a window where operations could exceed configured quotas.
Could you add a unit test to cover this case?
Multiple threads could check quota before any recorded usage, allowing all to bypass limits simultaneously.
Also, cover this case with a unit test to understand the issue.
Modified ReplicaManager to call recordAndCheckFetchQuota() BEFORE dispatching remote fetch,
If the remoteFetchQuotaBytesPerSecond is set to 25 Mbps and there is a message with 30 MB size, will the consumer get stuck? The previous behaviour was to allow the consumption to continue.
On error, releases the full reservation to prevent leaks
How can Kafka differentiate whether the error is from remote storage or there is an error in processing the response? I think this is a good improvement. Usually, we don't expect errors in RLMM or any Kafka components while processing the response.
Thanks so much for the review and reply! I've updated the PR with the tests.
If the remoteFetchQuotaBytesPerSecond is set to 25 Mbps and there is a message with 30 MB size, will the consumer get stuck? The previous behaviour was to allow the consumption to continue.
No. The behavior follows Kafka's standard quota handling pattern used in other fetch paths - we allow at least one fetch to proceed even if it exceeds the quota, but subsequent requests will be throttled to bring the average back within limits.
How can Kafka differentiate whether the error is from remote storage or there is an error in processing the response?
You're right that this is an improvement worth discussing. Currently, the implementation releases the quota reservation on any error during remote fetch processing. In practice, most errors happen after bandwidth usage? We could refine this to only release on specific error types. What's your preference?
A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.