[Feature] Add asynchronous bucket refresh mechanism for dynamic bucket mode
Search before asking
- [x] I searched in the issues and found nothing similar.
Motivation
In dynamic bucket mode, when buckets fill up and the system runs out of available empty buckets, the assigner cannot detect that previously full buckets may have become available again after compaction or data deletion. This causes unnecessary creation of new buckets or bucket exhaustion, leading to poor bucket distribution and potential performance degradation.
Solution
Introduce an asynchronous bucket refresh mechanism that periodically scans the bucket table to discover newly available buckets. The refresh is triggered when the number of empty buckets drops below a configurable threshold (dynamic-bucket.empty-bucket-threshold) and respects a minimum time interval between refreshes (dynamic-bucket.refresh-interval) to avoid excessive overhead. The refresh runs asynchronously in a separate thread without blocking bucket assignment operations, ensuring that the system can continuously discover and reuse buckets that have become available due to compaction or data deletion.
Anything else?
This it would be an approach.
In PartitionIndex assign method
// 1. is it a key that has appeared before
if (hash2Bucket.containsKey(hash)) {
return hash2Bucket.get(hash);
}
//new code
if (shouldRefreshEmptyBuckets(maxBucketId, minEmptyBucketsBeforeAsyncCheck)
&& isReachedTheMinRefreshInterval(minRefreshInterval)) {
refreshBucketsFromDisk();
}
New methods to check if we should execute the refresh
private boolean shouldRefreshEmptyBuckets(
int maxBucketId, int minEmptyBucketsBeforeAsyncCheck) {
return maxBucketId != -1
&& minEmptyBucketsBeforeAsyncCheck != -1
&& (nonFullBucketInformation.size()
== maxBucketId - minEmptyBucketsBeforeAsyncCheck);
}
private boolean isReachedTheMinRefreshInterval(final Duration duration) {
return Instant.now().isAfter(lastRefreshTime.plus(duration));
}
And the refresh method:
private void refreshBucketsFromDisk() {
// Only start refresh if not already in progress
if (refreshFuture == null || refreshFuture.isDone()) {
refreshFuture =
CompletableFuture.runAsync(
() -> {
try {
List<IndexManifestEntry> files =
indexFileHandler.scanEntries(HASH_INDEX, partition);
Map<Integer, Long> tempBucketInfo = new HashMap<>();
for (IndexManifestEntry file : files) {
long currentNumberOfRows = file.indexFile().rowCount();
if (currentNumberOfRows < targetBucketRowNumber) {
tempBucketInfo.put(file.bucket(), currentNumberOfRows);
}
}
//nonFullBucketInformation is a ConcurrentHashMap
nonFullBucketInformation.putAll(tempBucketInfo);
lastRefreshTime = Instant.now();
} catch (Exception e) {
// Log error instead of throwing
System.err.println(
"Error refreshing buckets from disk: "
+ e.getMessage());
}
});
}
}
Are you willing to submit a PR?
- [x] I'm willing to submit a PR!
I'd like to take over this enhancement and start working on implementing the asynchronous bucket refresh mechanism for dynamic bucket mode.
My plan:
-
Review the current PartitionIndex logic and how dynamic buckets are managed.
-
Implement the proposed async refresh logic using CompletableFuture to periodically scan and update available buckets.
-
Add configuration options for:
-
dynamic-bucket.empty-bucket-threshold -
dynamic-bucket.refresh-interval
- Write tests to verify that refreshed buckets are correctly reused and that the refresh operation doesn’t block assignment.
Once the initial implementation is ready, I’ll open a PR for review and feedback.
Hi @RaguCS ,
Thanks for offering to take this on and for the very clear and solid plan.
My PR has an almost-finished implementation of this feature (there are a lot of things pending as the tests). However, your plan looks great, and I agree it's the right direction for the project.
If you'd like to lead the implementation from here, please feel free to use any code or ideas from my PR. I'm happy to close it in favor of your new one to avoid duplicated efforts.
Just let me know how you'd like to proceed!
Cheers,
Hi @irodriguezclaveria ,
Thanks a lot for the kind message and for sharing your progress on this feature! I really appreciate your openness and collaboration.
Your existing PR and implementation details will be very helpful — I’ll make sure to review and reuse parts of it where appropriate to keep consistency and avoid duplicated work.
I’d also love to collaborate and discuss ideas as the work progresses. If you’re open to it, feel free to reach out to me via email at [email protected] — it’d be great to exchange thoughts or get your guidance when I hit any blockers.
Cheers, Ragu ☕
Hey @RaguCS ,
Awesome, thanks for reaching out! So glad you found the PR useful.
Sounds great, I'll drop you an email. Happy to help out if you hit any roadblocks or just want to chat about the approach.
🙌 Thanks!
Hey @irodriguezclaveria ,
Awesome, thanks a lot! Really appreciate your willingness to help and collaborate. I’ll keep you posted on my progress and will definitely reach out if I run into any roadblocks or want to discuss the design further.
Looking forward to working together on this! 🚀