spark-rapids icon indicating copy to clipboard operation
spark-rapids copied to clipboard

[FEA] Triple Buffering: Bind Async Resource Budget to Physical Memory Allocation

Open sperlingxx opened this issue 3 weeks ago • 0 comments

Is your feature request related to a problem? Please describe. The current ResourceBoundedExecutor manages asynchronous scanning using a "virtual" budget (Triple Buffer MemManagement) that is loosely coupled with the actual physical lifecycle of off-heap memory.

  • Disconnected Lifecycles: We currently rely on complex DecayReleaseResult callbacks to manually synchronize the virtual budget release with physical buffer deallocation . This is error-prone and complicates the runner state machine.
  • Inefficient Budgeting: The initial virtual budget allocation for MultiThreadFileReader is often exaggerated because it ignores metadata-based row filters and column pruning . Consequently, we often hold onto a large, unused budget until the very end of the runner's lifecycle, reducing concurrency.

Describe the solution you'd like We propose binding the virtual budget directly to the physical HostMemoryBuffer allocation within the MemoryBoundedAsyncRunner.

  1. Runner as the Allocator The MemoryBoundedAsyncRunner will implement HostMemoryAllocator. Instead of the runner requesting a budget and then separately allocating memory, the runner itself will serve as the source of truth. All allocations for the task will pass through the runner, allowing it to track localPool (budget) against usedMem (physical usage) atomically.

  2. Precise Deallocation via Event Handlers We will leverage the cuDF MemoryBuffer.eventHandler mechanism to automate resource release.

    • Mechanism: When a physical HostMemoryBuffer is closed (refCount reaches 0), the event handler triggers immediately.
    • Benefit: This allows us to release the virtual budget tracked by ResourceBoundedExecutor at the exact moment of physical deallocation .
    • Outcome: We can completely remove the DecayReleaseResult abstraction and explicit decay callbacks, as the memory management becomes self-sustaining.
  3. Early Release of Over-Claimed Budget By centralizing allocation logic, we can optimize how we handle the discrepancy between estimated and actual memory usage.

    • Optimization: Once the ParquetPartitionReader determines the actual memory required (after applying filters and pruning), we can identify the "over-claimed" portion of the initial budget.
    • Action: We will introduce a tryFree mechanism to return this unused budget to the global HostMemoryPool immediately, rather than waiting for the task to finish . This frees up capacity for other runners much sooner.

Additional context

Deadlock Prevention Strategy

To support this stricter binding without causing deadlocks, we propose specific enhancements to HostMemoryPool:

  • Dynamic Borrowing for Underestimated Splits: The initial memory budget is typically derived from the PartitionedFile split length provided by the Spark Driver. This length is often smaller than the actual host memory buffer required because the reader must access offsets outside the split range (e.g., file footers or adjacent row group metadata).
    • Solution: We introduce borrowMemory. When allocate detects that the localPool is insufficient, it dynamically borrows the deficit from the global pool .
    • Priority: Borrowers are prioritized over new tasks to ensure active runners can complete their work .
  • Strategic Over-Commit: If numRunnerInFlight == 0, the pool will grant resource requests even if they exceed the limit . This guarantees that at least one runner can always proceed, preventing circular wait conditions.

sperlingxx avatar Dec 08 '25 06:12 sperlingxx