Add an optional bandwidth cap to `TieredMergePolicy`?
Description
TL;DR: TieredMergePolicy can create massive snapshots if you configure it for aggressive deletesPctAllowed, which can hurt searchers (cause page fault storms) in a near-real-time replication world. Maybe we could add an optional (off by default) "rate limit" on how many amortized bytes/sec TMP is merging? This is just an idea / brainstorming / design discussion so far ... no PR.
Full context:
At Amazon (Product Search team) we use near-real-time segment replication to efficiently distribute index updates to all searchers/replicas.
Since we have many searchers per indexer shard, to scale to very high QPS, we intentionally tune TieredMergePolicy (TMP) to very aggressively reclaim deletions. Burning extra CPU / bandwidth during indexing to save even a little bit of CPU during searching is a good tradeoff for us (and in general, for Lucene users with high QPS requirements).
But we have a "fun" problem occasionally: sometimes we have an update storm (an upstream team reindexes large-ish portions of Amazon's catalog through the real-time indexing stream), and this leads to lots and lots of merging and many large (max-sized 5 GB) segments being replicated out to searchers in short order, sometimes over links (e.g. cross-region) that are not as crazy-fast as within-region networking fabric, and our searchers fall behind a bit.
Falling behind is not the end of the world: the searchers simply skip some point-in-time snapshots and jump to the latest one, effectively sub-sampling checkpoints as best they can given the bandwidth constraints. Index-to-search latency is hurt a bit, but recovers once the indexer catches up on the update storm.
The bigger problem for us is that we size our shards, roughly, so that the important parts of the index (the parts hit often by query traffic) are fully "hot". I.e. so the OS has enough RAM to hold the hot parts of the index. But when it takes too long to copy and light (switch over to the new segments for searching) a given snapshot, and we skip the next one or two snapshots, the followon snapshot that we finally do load may have a sizable part of the index rewritten, and the snapshot size maybe a big percentage of the overall index, and copying/lighting it will stress the OS into a paging storm, hurting our long-pole latencies.
So one possible solution we thought of is to add an optional (off by default) setMaxBandwidth to TMP so that "on average" (amortized over some time window ish) TMP would not produce so many merges that it exceeds that bandwidth cap. With such a cap, during an update storm (war time), the index delete %tg would necessarily increase beyond what we ideally want / configured with setDeletesPctAllowed, but then during peace time, TMP could again catch up and push the deletes back below the target.
I wonder if this is something that could be implemented in the merge scheduler rather than in the merge policy. Thinking out loud: the merge policy's responsibility is to compute efficient merges that meet its constraints (merge factor, deletes, etc.). But then the merge scheduler is free to throttle these merges, or even to ignore some of them if they don't meet its own constraints?
MergeScheduler can reject or throttle certain merges but I wonder if it can somehow communicate the constraints to the MergePolicy to suggest MP to produce plausible but not most efficient merge from the POV of the MP.
I guess that's why (at least partly) @mikemccand suggested to change TieredMergePolicy
Doing this in MergeScheduler (MS) is indeed another option. It'd mean you could cap replication bandwidth independent of your MergePolicy (MP). MS could even fine-tune where it does its throttling, e.g. maybe it's fine to run many merges at once, if "bandwidth" to the local storage of your IndexWriter is not a problem, but then once the merge completes, don't commit the merge to the index (swap in the newly merged segment in place of the old ones) until enough time has passed (in fact, NRT replica warming of newly merged segments takes this same approach: delay merge-commit long enough so replicas can pre-copy newly merged segments, ensuring NRT latency (index to searchability delay) stays low even when copying massive merged segments). I.e. "throttle at start" or "throttle at end" are two sub-options to doing this in MS
I guess "throttle during" is also an option, just like the IO rate limiter can do today ... in fact, the IO rate limiter is already one way to cap bandwidth.
But the big problem with doing this throttling late in the game (In MS not MP) is that MP can then make poor choices. I.e. maybe MP decides to "merge these 5 segments now". It submits the merge, but MS waits before starting the merge (to cap the bandwidth). Say it waits ten minutes... well, after those ten minutes, MP might now want to make a different/better merge choice, since the index looks different (after 10 minutes of indexing the updates stream), yet because it already submitted the prior (now stale) merge, it is not allowed to pick any of those five segments since they are now held out as merging. This would effectively mean MP is forced to make choices based on stale/delayed information. It's sort of like the good old days of day-traders who only had access to the "20 minute delayed stock prices".
But maybe that handicap to TMP would be fine in practice? Maybe, the choices it makes based on stale index geometry are not so different from what it would make with "live" stock prices? Though, if it was a max-sized merge, and enough updates/deletes arrive in those ten minutes, then MP would've been able to merge maybe a 6th segment into the same merge while staying under the max segment size. Not sure ...
Or maybe even some combination of the two approaches? MP could look at IW, see that there is backpressure (paused merges because MS has to stay under bandwidth cap), and delay picking new merges even if it wants to? Maybe that's a simple solution ... MS throttles, and MP detects that backpressure.
Intuitively, I had thought of the "throttle at start" approach, where we would also give MS the ability to filter out some merges from MP (so that they don't get registered to the IndexWriter) to avoid the problem you mentioned that the most efficient merge now may not be the most efficient merge 10 minutes later when more segments / deletes have accumulated.
I guess it's similar to the idea you mentioned in your last paragraph, though I like that MP would remain completely unaware of any bandwidth / back-pressure / throttling concerns.
Oh, that's a neat idea (MergeScheduler being able to "cancel" merge choices from MergePolicy).
I think we would have to register the merge, immediately, on getting it from MergePolicy. That's the only thing that prevents MergePolicy from e.g. simply picking that merge again.
But then, to keep under bandwidth budget, CMS could delay actually starting the merge. But, if enough time passes, maybe CMS decides later to then simply cancel (and unregister) the merge? This would kick back those to-be-merged segments into consideration again by the MergePolicy, and maybe it picks a different / better merge based on new information?
TieredMergePolicy does produces merges from "most to least importance", roughly, so if CMS allowed early merges to run, but then delayed/canceled later ones, it could work well -- TMP would reconsider the segments that were lower priority for merging.
It would be nice if MergePolicy could remain ignorant to bandwidth constraints.
That's the only thing that prevents MergePolicy from e.g. simply picking that merge again.
I wonder if we actually need to prevent it from picking the same merge again. Could we wait util it suggests a different merge (either instead of the initial merge that did not meet bandwidth constraints, or in addition to (since MP can suggest multiple merges)).
Hello, I will be diving into this issue for my summer internship. Looking forward to discussing more in the near future and contributing to this project!
Hey all, wanted to throw out an idea and some possible approaches, would love to get some thoughts or any suggestions,
-
Create a new scheduler (Proposed by Mike McCandless): An example would be to create a class
BandwidthCappedMergePolicySchedulerwhich is aMergeSchedulerbut it has a.getMergePolicy()method to also get an internalMergePolicythat is aware of the shared bandwidth. So, adjust merge selection based on available bandwidth, shared state tracks bandwidth usage and pending merges, when bandwidth becomes available, policy reconsiders all segments and selects optimal merges based on current bandwidth capacity -
Handle bandwidth rate limiting in the new
MergeSchedulerby deferring merges based on bandwidth,TMPwould stop suggesting merges since they are already registered, when bandwidth becomes available then check if deferred merges are better and execute if they are or else just cancel and merge the new suggested ones -
Have a leaky bucket algorithmic approach in the implementation just like how AWS handles provisioned IOPS instances
Thoughts? Ideas? Open to any comments or discussions
I don't know what would help or hurt here; I feel there is room for improvement and we want to take a fresh look, but further tweaks to TMP seem difficult since it is already heavily tuned with lots of interacting parameters that are hard to know the impact of, but we lack data about any other specific approach. In a circumstance like that I would tend to try small changes first, so maybe start with the scheduler that selects among merges proposed by TMP? And then later move on to influencing the actual merge proposals? I don't think that's a radical change, but I'm not sure, don't know this area well.
Yes, the idea is not to touch the TMP because of it's complexity, but to start off with creating a new BandwidthCappedMergePolicyScheduler that extends MergeScheduler, which would select among merges proposed by TMP as you mentioned.
I think the "wrapped" (inner) MergePolicy that BandwidthCappedMergeScheduler.getMergePolicy() would provide would be quite a bit simpler than TMP is today because it would not trigger merges based on complex criteria but rather when bandwidth is available. So an entirely new inner MergePolicy might be doable/cleaner.
BandwidthCappedMergeScheduler will need much of what CMS already does today (run merges in bg threads, instrument the bytes written by each over time)? Maybe subclassing CMS, or maybe forking it temporarily (poaching its code) with a TODO to unfork?
But it's a good idea to baby step first -- maybe the inner MergePolicy just wraps an actual TieredMergePolicy but filters the merges that TMP wanted to do so that those merges are only returned through the wrapping to IndexWriter when bandwidth allows? That's maybe a good first step -- it enables us to effectively make TMP defer its merge decisions until "a good time" (bandwidth freed up).
For configuring the bandwidth, I think there needs to be some tolerance for not precisely hitting it. E.g. I ask for around 20 MiB/s long term average (the rate at which water comes out of faucet into bathtub), but the control algorithm cannot be perfect. E.g. up front it knows how large a merge is, but it doesn't know how long that merge will take -- that is so variable by application, but perhaps predictable over time as it sees how long merges "typically" take? So the docs for this class should make it clear it's kind of a rough target?
Drafted a PR for adding a BandwidthCappedMergeScheduler, would love to get some discussion/comments on it.