druid icon indicating copy to clipboard operation
druid copied to clipboard

experimental virtual storage fabric mode for historicals

Open clintropolis opened this issue 5 months ago • 1 comments

Description

todo: write real description

changes:

  • Added new CacheEntry abstraction to segment storage layer management, with methods to identify an entry, track byte size, as well as 'mount' and 'unmount' the cache entry when it is assigned to or removed from a storage location.
  • rewrote StorageLocation and SegmentLocalCacheManager to work with the new CacheEntry abstraction
  • StorageLocation can now store 'weak' references, which may be unmounted to reclaim space if new static or weak entries need to be added to the storage location. Eviction is handled using an algorithm based on SIEVE so that one hit wonders are less likely to evict popular entries
  • added new config druid.segmentCache.isVirtualStorageFabric to enable a 'virtual storage fabric' mode for historical servers. When ``druid.segmentCache.isVirtualStorageFabricis set to true,SegmentLocalCacheManagerperforms all segment loads as 'weak' loads. These segments are visible in the timeline, but are not downloaded until query processing.druid.server.maxSizecan be set to a value much greater than the total of the individual storage locations, making the segment cache behave as an actual cache, managed using the new 'weak' reference logic ofStorageLocation`.
  • added supporting configs druid.segmentCache.minVirtualStorageFabricLoadThreads, druid.segmentCache.maxVirtualStorageFabricLoadThreads, and druid.segmentCache.virtualStorageFabricLoadThreadKeepaliveMillis to control behavior of the on demand download pool used to retrieve weak reference segments from deep storage, mainly to help tune ideal default values for these settings.
  • Added new ensureLoaded to ReferenceCountedSegmentProvider, which returns a WeakSegmentReferenceProviderLoadAction to wrap a SegmentDescriptor and ListenableFuture<ReferenceCountedSegmentProvider> the latter of which represents a loaded segment. SegmentLocalCacheManager has a new WeakReferenceCountedSegmentProvider that is used when druid.segmentCache.isVirtualStorageFabric is true that implements this method to fetch the backing segment from deep storage and mount it in the cache as a weak reference. The existing ReferenceCountedSegmentProvider of this method is an immediate future that returns the same reference provider since the segment is statically stored in the cache.
  • ServerManager now calls the new ensureLoaded on all segments being processed as part of building the QueryRunner in getQueryRunnersForSegments. This allows all of the segments taking part in the query to be present in the cache prior to landing on the processing pool.
  • more stuff probably...

todo:

  • resolve remaining todos
  • dart support
  • more test

Release note

todo: some release


This PR has:
  • [ ] been self-reviewed.
    • [ ] using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
  • [ ] added documentation for new or modified features or behaviors.
  • [ ] a release note entry in the PR description.
  • [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • [ ] added or updated version, license, or notice information in licenses.yaml
  • [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • [ ] added integration tests.
  • [ ] been tested in a test Druid cluster.

clintropolis avatar Jun 26 '25 17:06 clintropolis

👍 Might be good to add a link to the original SIEVE paper: https://www.usenix.org/conference/nsdi24/presentation/zhang-yazhuo for those interested

jtuglu1 avatar Jun 27 '25 02:06 jtuglu1

@clintropolis , I haven't gone through the PR yet but how will this affect segment assignment/balancing on the Coordinator?

  • The Coordinator currently assigns segments to a Historical only if it has available disk space.
  • The available disk space is calculated in ImmutableDruidServer based on the configured maxSize and the sum of the DataSegment.getSize() of all the segments currently present on this historical (as reported by the HttpServerInventoryView)
  • I suppose the balancing algorithm can remain unchanged as it is already designed to optimize query performance rather than disk usage.

kfaraz avatar Jun 29 '25 02:06 kfaraz

👍 Might be good to add a link to the original SIEVE paper: https://www.usenix.org/conference/nsdi24/presentation/zhang-yazhuo for those interested

yea totally, I have it linked in javadocs for StorageLocation and was planning to add it to the PR description once I fill it in when this is closer to ready to review just haven't got to it yet 😅 (still changing quite a few things).

@clintropolis , I haven't gone through the PR yet but how will this affect segment assignment/balancing on the Coordinator?

This first PR has no changes needed to the coordinator logic. When a historical is set to this mode, the idea is that you set druid.server.maxSize to how much data you want it to be responsible for, but set the sizes in druid.segmentCache.locations to be the actual disk sizes, and during querying the cache manager will load and drop segments internally as appropriate to stay within the constraints of the druid.segmentCache.locations sizes. I'll elaborate more in the PR description once this branch gets closer to review ready.

I do have some ideas for follow-up work of adding a new type of "weak" load rule to allow historicals to use the same load logic it does for all segments when druid.segmentCache.isVirtualStorageFabric from this PR is set to true, but also still have regular segment loads. This would allow for finer grained control over how segments are loaded by allowing some segments to be sticky and always present in the disk cache (the cache manager supports this internally), while others would be weak references and load on demand but be eligible to be dropped if new strong or weak loads need the space. This likely does require some adjustments to coordinator balancing to distinguish 'weak' loads from regular loads to ensure the regular loads to exceed actual disk space, but I think the changes would be pretty minor. Was planning to address this too in the PR description (or maybe a linked design proposal issue, not sure, haven't decided yet).

clintropolis avatar Jul 01 '25 18:07 clintropolis

@clintropolis now that segments are being cached on-demand on historicals (not all pre-loaded), and intra-AZ requests are much faster than S3 reads (intra-AZ requests are typically XXXµs, whereas S3 is ~500 ms for single RTT not to mention the numerous RTT needed from downloading massive segments), is this moving towards the goal to make historicals more "stateless" and more of a distributed caching tier? i.e historical A can pull column C from segment D from historical B? Basically improve "hit" rate and spread query load by allowing historicals to pass segment data between each other?

jtuglu1 avatar Jul 01 '25 18:07 jtuglu1

@clintropolis now that segments are being cached on-demand on historicals (not all pre-loaded), and intra-AZ requests are much faster than S3 reads (intra-AZ requests are typically XXXµs, whereas S3 is ~500 ms for single RTT not to mention the numerous RTT needed from downloading massive segments), is this moving towards the goal to make historicals more "stateless" and more of a distributed caching tier? i.e historical A can pull column C from segment D from historical B? Basically improve "hit" rate and spread query load by allowing historicals to pass segment data between each other?

Initially our thinking was to focus on partial fetches from S3, so we only have to fetch the columns actually needed for a query. From my PoV (@clintropolis may have his own opinion) from there it would make sense to have the Druid servers able to fetch from each other. It would make it more feasible to burst up compute for a short period of time (~seconds even would make sense).

Btw, I would say Historicals are already "stateless" in that they are just a cache of immutable segments (backed by S3) + some compute. The initial work we're doing with virtual storage is enabling the cache to be populated on demand from S3 during a query, instead of needing to be populated before the query starts. This improves things in two main ways:

  • Historicals are immediately usable after being booted up; don't need to populate their cache before they usable for queries
  • you can have more data in S3 than available disk on Historicals, if you like

Later on, fetching data from each other would be a way to improve the latency of cache population, which will have the effect of making the servers more responsive immediately following a cold start.

gianm avatar Jul 03 '25 02:07 gianm

@clintropolis - how do segment metadata queries work that touch lots of segments and are system-generated? We wouldn't want to fetch all the segments to build sql schema on broker.

abhishekagarwal87 avatar Jul 07 '25 07:07 abhishekagarwal87

Curious how is SegmentDescriptor used in AcquireSegmentAction? I thought it's enough to just use DataSegment? Also I kinda feel many SegmentManager logic could be moved to DataSourceState class, which could encapsulate better, e.x. segments must be all joinable or non joinable.

cecemei avatar Jul 08 '25 02:07 cecemei

Initially our thinking was to focus on partial fetches from S3, so we only have to fetch the columns actually needed for a query. From my PoV (@clintropolis may have his own opinion) from there it would make sense to have the Druid servers able to fetch from each other. It would make it more feasible to burst up compute for a short period of time (~seconds even would make sense).

Thanks for chiming in on this @gianm, I agree the next big area I'm probably going to be looking at is partial downloads, which will build on top of some of the stuff here (with some modifications) and introduce some new machinery to do downloads at a finer granularity than the whole segment that this requires.

After that I agree it makes sense to allow pulling 'locally' from other servers if available instead of deep storage after we get all the foundations in place.

clintropolis avatar Jul 15 '25 23:07 clintropolis

@clintropolis - how do segment metadata queries work that touch lots of segments and are system-generated? We wouldn't want to fetch all the segments to build sql schema on broker.

Yea, there is a bit of ugly special handling for segment metadata queries right now https://github.com/apache/druid/pull/18176/files#diff-3d43c1b27316a889f58c28a3373b157f38f633d40d6ab4495d350dece7ce7a44R293 that doesn't allow it to download segments for this reason. This means that to use SQL you currently either need a non-virtual tier, or if only using a virtual storage mode tier then either the catalog or centralized schema stuff must be relied on to supply the SQL schema.

In the future, I have some segment format changes in mind to support partial downloads that would also allow us to have the schema available without downloading the whole segment, so I think this can be a temporary problem.

clintropolis avatar Jul 15 '25 23:07 clintropolis

Curious how is SegmentDescriptor used in AcquireSegmentAction? I thought it's enough to just use DataSegment?

SegmentDescriptor isn't strictly used by the AcquireSegmentAction, its more of a bit of tracking convenience for callers, which start out with a list of SegmentDescriptor, so that it is easy to associate the segment loading futures with the SegmentDescriptor originally requested.

Also I kinda feel many SegmentManager logic could be moved to DataSourceState class, which could encapsulate better, e.x. segments must be all joinable or non joinable.

I would agree that there are possibly some changes that could be done here, but I'm not too motivated to do them in this PR since enough other stuff is already changing. At least DataSourceState is basically internal to SegmentManager, so it should be pretty easy to change stuff around later if we want.

clintropolis avatar Jul 15 '25 23:07 clintropolis