paimon icon indicating copy to clipboard operation
paimon copied to clipboard

[Feature] Add asynchronous bucket refresh mechanism for dynamic bucket mode

Open irodriguezclaveria opened this issue 3 months ago • 5 comments

Search before asking

  • [x] I searched in the issues and found nothing similar.

Motivation

In dynamic bucket mode, when buckets fill up and the system runs out of available empty buckets, the assigner cannot detect that previously full buckets may have become available again after compaction or data deletion. This causes unnecessary creation of new buckets or bucket exhaustion, leading to poor bucket distribution and potential performance degradation.

Solution

Introduce an asynchronous bucket refresh mechanism that periodically scans the bucket table to discover newly available buckets. The refresh is triggered when the number of empty buckets drops below a configurable threshold (dynamic-bucket.empty-bucket-threshold) and respects a minimum time interval between refreshes (dynamic-bucket.refresh-interval) to avoid excessive overhead. The refresh runs asynchronously in a separate thread without blocking bucket assignment operations, ensuring that the system can continuously discover and reuse buckets that have become available due to compaction or data deletion.

Anything else?

This it would be an approach.

In PartitionIndex assign method


  // 1. is it a key that has appeared before
       if (hash2Bucket.containsKey(hash)) {
            return hash2Bucket.get(hash);
        }
        
       //new code
        if (shouldRefreshEmptyBuckets(maxBucketId, minEmptyBucketsBeforeAsyncCheck)
                && isReachedTheMinRefreshInterval(minRefreshInterval)) {
            refreshBucketsFromDisk();
        }

New methods to check if we should execute the refresh

 private boolean shouldRefreshEmptyBuckets(
            int maxBucketId, int minEmptyBucketsBeforeAsyncCheck) {
        return maxBucketId != -1
                && minEmptyBucketsBeforeAsyncCheck != -1
                && (nonFullBucketInformation.size()
                == maxBucketId - minEmptyBucketsBeforeAsyncCheck);
    }

    private boolean isReachedTheMinRefreshInterval(final Duration duration) {
        return Instant.now().isAfter(lastRefreshTime.plus(duration));
    }

And the refresh method:


  private void refreshBucketsFromDisk() {
        // Only start refresh if not already in progress
        if (refreshFuture == null || refreshFuture.isDone()) {
            refreshFuture =
                    CompletableFuture.runAsync(
                            () -> {
                                try {
                                    List<IndexManifestEntry> files =
                                            indexFileHandler.scanEntries(HASH_INDEX, partition);
                                    Map<Integer, Long> tempBucketInfo = new HashMap<>();

                                    for (IndexManifestEntry file : files) {
                                        long currentNumberOfRows = file.indexFile().rowCount();
                                        if (currentNumberOfRows < targetBucketRowNumber) {
                                            tempBucketInfo.put(file.bucket(), currentNumberOfRows);
                                        }
                                    }
                                    //nonFullBucketInformation is a ConcurrentHashMap
                                    nonFullBucketInformation.putAll(tempBucketInfo);
                                    lastRefreshTime = Instant.now();
                                } catch (Exception e) {
                                    // Log error instead of throwing
                                    System.err.println(
                                            "Error refreshing buckets from disk: "
                                                    + e.getMessage());
                                }
                            });
        }
    }

Are you willing to submit a PR?

  • [x] I'm willing to submit a PR!

irodriguezclaveria avatar Oct 12 '25 20:10 irodriguezclaveria

I'd like to take over this enhancement and start working on implementing the asynchronous bucket refresh mechanism for dynamic bucket mode.

My plan:

  1. Review the current PartitionIndex logic and how dynamic buckets are managed.

  2. Implement the proposed async refresh logic using CompletableFuture to periodically scan and update available buckets.

  3. Add configuration options for:

  • dynamic-bucket.empty-bucket-threshold

  • dynamic-bucket.refresh-interval

  1. Write tests to verify that refreshed buckets are correctly reused and that the refresh operation doesn’t block assignment.

Once the initial implementation is ready, I’ll open a PR for review and feedback.

RaguCS avatar Oct 17 '25 06:10 RaguCS

Hi @RaguCS ,

Thanks for offering to take this on and for the very clear and solid plan.

My PR has an almost-finished implementation of this feature (there are a lot of things pending as the tests). However, your plan looks great, and I agree it's the right direction for the project.

If you'd like to lead the implementation from here, please feel free to use any code or ideas from my PR. I'm happy to close it in favor of your new one to avoid duplicated efforts.

Just let me know how you'd like to proceed!

Cheers,

irodriguezclaveria avatar Oct 17 '25 14:10 irodriguezclaveria

Hi @irodriguezclaveria ,

Thanks a lot for the kind message and for sharing your progress on this feature! I really appreciate your openness and collaboration.

Your existing PR and implementation details will be very helpful — I’ll make sure to review and reuse parts of it where appropriate to keep consistency and avoid duplicated work.

I’d also love to collaborate and discuss ideas as the work progresses. If you’re open to it, feel free to reach out to me via email at [email protected] — it’d be great to exchange thoughts or get your guidance when I hit any blockers.

Cheers, Ragu ☕

RaguCS avatar Oct 17 '25 15:10 RaguCS

Hey @RaguCS ,

Awesome, thanks for reaching out! So glad you found the PR useful.

Sounds great, I'll drop you an email. Happy to help out if you hit any roadblocks or just want to chat about the approach.

irodriguezclaveria avatar Oct 17 '25 15:10 irodriguezclaveria

🙌 Thanks!

Hey @irodriguezclaveria ,

Awesome, thanks a lot! Really appreciate your willingness to help and collaborate. I’ll keep you posted on my progress and will definitely reach out if I run into any roadblocks or want to discuss the design further.

Looking forward to working together on this! 🚀

RaguCS avatar Oct 17 '25 15:10 RaguCS