[VL] Support file cache spill in Gluten
Description
Velox backend provides 2-level file cache (AsyncDataCache and SsdCache) and we have enabled it in PR, using a dedicated MMapAllocator initialized with configured capacity. This part of memory is not counted by execution memory or storage memory, and not managed by Spark UnifiedMemoryManager. In this ticket, we would like to fill this gap by following designs:
- Add
NativeStorageMemorysegment in vanillaStorageMemory. We will have a configurationspark.memory.native.storageFractionto define its size. Then we use this sizeoffheap.memory*spark.memory.storageFraction*spark.memory.native.storageFractionto initializeAsyncDataCache. - Add configuration
spark.memory.storage.preferSpillNativeto determine preference of spilling RDD cache or FileCache(Native) when storage memory should be shrinked. For example, when queries are mostly executed on same data sources, we prefer to keep native file cache. - Introduce
NativeMemoryStoreto provide similar interfaces as vanillaMemoryStoreand callAsyncDataCache::shrinkwhen eviction needed. - Introduce
NativeStorageMemoryAllocatorwhich is a memory allocator used for creatingAsyncDataCache. It's wrapped with aReservationListenerto track the memory usage in native cache. VeloxBackendinitialization will be done w/o cache created. We will doVeloxBackend::setAsyncDatacachewhen memory pools initializing.
The key code path will like following:
@zhouyuan @zhztheplayer Can you help take a review at this draft design? Thanks.
Add configuration spark.memory.storage.preferSpillNative to determine preference of spilling RDD cache or FileCache(Native) when storage memory should be shrinked. For example, when queries are mostly executed on same data sources, we prefer to keep native file cache.
Will there be any better name than preferSpillNative ? One can't easily understand it's not about Gluten's RDD cache. User may think of all Gluten stuffs native.
@zhli1142015
Thanks.
@zhli1142015 @FelixYBW @zhouyuan @zhztheplayer The code changes are available in following PRs: Spark, Gluten, Velox, please take a review. Next step I will test it in E2E and add some docs for it. Here are some explanations about code change:
- New files in
shims/common: Existing memory allocator listeners such asManagedAllocationListenerare under packagegluten-dataand native JNIs are underbackends-velox, but because I need to call these classes/APIs in the injects, so I put them inshims/common. - Late initialization of file cache: We use
GlutenMemStoreInjectsto get the conf of cache and then do initialization after Velox backend initialized which assures the native libs are loaded. - Cache size setting: we need to pass a cache size when
setAsyncDataCache, using the defaultint64_t maxwill cause astd::bad_alloc. But the size is sensitive since in Velox, data cache will use this value to control the memory allocation. If it is too small, allocation failure will happen at native side even Spark hasn't reported it at java side yet. As We leverage Spark memory manager to control the memory logic, we'd resolve this confliction by giving a large fake size for AsyncDataCache, maybe same as offheap size. - SSD cache can't work well in my test as the file cache entry is easily larger than
8Mand will cause check failure. Issue is reported for tracking.
Some updates:
- Based on previous implementation that we added a listener against
MmapMemoryAllocatorto trigger theacquireStoargeMemory/releaseStorageMemoryby binding withfreeNonContiguous/allocateNonContiguousWithoutRetry, there is a problem can't be resolved because of these 2 methods are not always called when memory usage changes in AsyncDataCache. For example, when do ashrink(bytes), the released size is not always equal to the bytes returned byfreeNonContiguous, like this logfreed size by freeAllocations: 1753088 largeEvicted is: 4374528 tinyEvicted is:0. Velox memory management introduced several regions, like small data, pages data, etc. So if we use returned shrinked size to decrease the storage memory pool, while usingfreeNonContiguous/allocateNonContiguousWithoutRetryto changememoryUsedvalue, there will eventually be a mismatch. - Instead, we now switch to add a listener against
AsyncDataCacheitself. Whenshrinkhappens, we will release the amount of storage memory. when a new entry created, we will acquire corresponding size memory. But may return some back as it may not actually increase such amount of bytes if cache shrink happens during a new cache entry creation. It also has a advantage that no special change needed at velox side with only the "findOrCreate()" method changes to virtual.
We have created PR for upstream Spark.
For velox, there is only one line change:
Latest code change for Gluten is tracked by PR.
@zhli1142015 would like to try this implementation in your workload?
Thank you @yma11 , yes let me try this. just to confirm, I need to apply all changes for spark, velox and gluten, is this correct?
Thank you @yma11 , yes let me try this. just to confirm, I need to apply all changes for spark, velox and gluten, is this correct?
Yes. Note that I just change shim layer of Spark3.3, so you may try same version.