incubator-gluten icon indicating copy to clipboard operation
incubator-gluten copied to clipboard

[VL] Support file cache spill in Gluten

Open yma11 opened this issue 1 year ago • 8 comments

Description

Velox backend provides 2-level file cache (AsyncDataCache and SsdCache) and we have enabled it in PR, using a dedicated MMapAllocator initialized with configured capacity. This part of memory is not counted by execution memory or storage memory, and not managed by Spark UnifiedMemoryManager. In this ticket, we would like to fill this gap by following designs:

  • Add NativeStorageMemory segment in vanilla StorageMemory. We will have a configuration spark.memory.native.storageFraction to define its size. Then we use this size offheap.memory*spark.memory.storageFraction*spark.memory.native.storageFraction to initialize AsyncDataCache.
  • Add configuration spark.memory.storage.preferSpillNative to determine preference of spilling RDD cache or FileCache(Native) when storage memory should be shrinked. For example, when queries are mostly executed on same data sources, we prefer to keep native file cache.
  • Introduce NativeMemoryStore to provide similar interfaces as vanilla MemoryStore and call AsyncDataCache::shrink when eviction needed.
  • Introduce NativeStorageMemoryAllocator which is a memory allocator used for creating AsyncDataCache. It's wrapped with a ReservationListener to track the memory usage in native cache.
  • VeloxBackend initialization will be done w/o cache created. We will do VeloxBackend::setAsyncDatacache when memory pools initializing.

The key code path will like following: image

yma11 avatar May 27 '24 13:05 yma11

@zhouyuan @zhztheplayer Can you help take a review at this draft design? Thanks.

yma11 avatar May 28 '24 07:05 yma11

Add configuration spark.memory.storage.preferSpillNative to determine preference of spilling RDD cache or FileCache(Native) when storage memory should be shrinked. For example, when queries are mostly executed on same data sources, we prefer to keep native file cache.

Will there be any better name than preferSpillNative ? One can't easily understand it's not about Gluten's RDD cache. User may think of all Gluten stuffs native.

zhztheplayer avatar May 28 '24 08:05 zhztheplayer

@zhli1142015

FelixYBW avatar May 28 '24 19:05 FelixYBW

Thanks.

zhli1142015 avatar May 29 '24 00:05 zhli1142015

@zhli1142015 @FelixYBW @zhouyuan @zhztheplayer The code changes are available in following PRs: Spark, Gluten, Velox, please take a review. Next step I will test it in E2E and add some docs for it. Here are some explanations about code change:

  1. New files in shims/common: Existing memory allocator listeners such as ManagedAllocationListener are under package gluten-data and native JNIs are under backends-velox, but because I need to call these classes/APIs in the injects, so I put them in shims/common.
  2. Late initialization of file cache: We use GlutenMemStoreInjects to get the conf of cache and then do initialization after Velox backend initialized which assures the native libs are loaded.
  3. Cache size setting: we need to pass a cache size when setAsyncDataCache, using the default int64_t max will cause a std::bad_alloc. But the size is sensitive since in Velox, data cache will use this value to control the memory allocation. If it is too small, allocation failure will happen at native side even Spark hasn't reported it at java side yet. As We leverage Spark memory manager to control the memory logic, we'd resolve this confliction by giving a large fake size for AsyncDataCache, maybe same as offheap size.
  4. SSD cache can't work well in my test as the file cache entry is easily larger than 8M and will cause check failure. Issue is reported for tracking.

yma11 avatar Jun 07 '24 01:06 yma11

Some updates:

  • Based on previous implementation that we added a listener against MmapMemoryAllocator to trigger the acquireStoargeMemory/releaseStorageMemory by binding with freeNonContiguous/allocateNonContiguousWithoutRetry, there is a problem can't be resolved because of these 2 methods are not always called when memory usage changes in AsyncDataCache. For example, when do a shrink(bytes), the released size is not always equal to the bytes returned by freeNonContiguous, like this log freed size by freeAllocations: 1753088 largeEvicted is: 4374528 tinyEvicted is:0. Velox memory management introduced several regions, like small data, pages data, etc. So if we use returned shrinked size to decrease the storage memory pool, while using freeNonContiguous/allocateNonContiguousWithoutRetry to change memoryUsed value, there will eventually be a mismatch.
  • Instead, we now switch to add a listener against AsyncDataCache itself. When shrink happens, we will release the amount of storage memory. when a new entry created, we will acquire corresponding size memory. But may return some back as it may not actually increase such amount of bytes if cache shrink happens during a new cache entry creation. It also has a advantage that no special change needed at velox side with only the "findOrCreate()" method changes to virtual.

We have created PR for upstream Spark. For velox, there is only one line change: image Latest code change for Gluten is tracked by PR. @zhli1142015 would like to try this implementation in your workload?

yma11 avatar Jun 26 '24 13:06 yma11

Thank you @yma11 , yes let me try this. just to confirm, I need to apply all changes for spark, velox and gluten, is this correct?

zhli1142015 avatar Jun 26 '24 13:06 zhli1142015

Thank you @yma11 , yes let me try this. just to confirm, I need to apply all changes for spark, velox and gluten, is this correct?

Yes. Note that I just change shim layer of Spark3.3, so you may try same version.

yma11 avatar Jun 27 '24 04:06 yma11