TaskExecutor should not fork unnecessarily
When executing N tasks and waiting on the result of all of them, we should only fork N - 1 times and execute one task on the calling thread that is getting blocked anyway. This saves at least one context switch, removes the need for any reentrancy protection, and makes better use of available CPU resources.
Your suggestion is how it used to work before: https://github.com/apache/lucene/pull/12499/files#diff-e744fc99cb74627f02e30c1cbda56dede66d2ecdfd57db2ce869b9a9a43fa41cR49-R64. The context switching isn't great indeed, but executing some tasks on the current thread makes it hard to correctly reason about and configure the number of threads that should perform work. The idea behind this other PR was that you would have a worker executor that would do almost all the work, and a coordination executor that would be mostly coordinating work in the worker threadpool. I'm not sure if we're at a point where this coordination executor could run on virtual threads, but at least conceptually this is how I'm thinking of it.
Something tangential that we touched a couple times but haven't implemented for now would consist of introducing an API on IndexSearcher that doesn't require waiting on futures to complete, e.g. something like public <C extends Collector, T> void search(Query query, CollectorManager<C, T> collectorManager, IOConsumer<T> resultConsumer).
Also related, we are replacing some of the forking with the new support we introduced for I/O concurrency: https://github.com/apache/lucene/pull/13359/files#diff-ad7c504406afec8940592f1fda0062d3e5321cdc5693c24ec6f5cfb02f8dd90dL100-R114.
The idea behind this other PR was that you would have a worker executor that would do almost all the work, and a coordination executor that would be mostly coordinating work in the worker threadpool.
But that's not what is happening practically right now? Whether I run N-1 tasks on the worker pool and one on the caller or N tasks on the worker and sleep on the caller thread, either way the coordinator thread is blocked and not coordinating anything in the meantime? I can see the argument of using the worker pool to limit task concurrency to x number of threads/cores. But I wonder since we are blocking the coordinating executors threads, leaving them idle, maybe there isn't that much coordinator work to do in the first place and both coordinator and worker pool can just be the same threads practically? If any of the tasks enqueued from the coordinating executor fans out again (and they do), that's how it works anyways.
Your suggestion is how it used to work before:
Not quite I think: The difference is that in my approach the coordinator thread keeps pulling stuff off of the queue in a loop, instead of just doing the last task. This means that the coordinating thread will not be "wasted" as much if the worker pool takes time to execute the tasks and can't do them all in parallel. Also, it resolves the dead-lock issue for single threaded or otherwise saturated executors.
I'm not sure if we're at a point where this coordination executor could run on virtual threads, but at least conceptually this is how I'm thinking of it.
That does make sense. But I wonder if this is something we should leave to the runtime/OS/... to figure out for us. It seems very desirable to limit the number of context switches when the API is synchronous so we can go through short tasks served from page cache with as little overhead as possible?
would consist of introducing an API on IndexSearcher that doesn't require waiting on futures to complete
++ that'd be really nice and save a lot of overhead here. That said we could optimize this code fairly easily to move from waiting on "futures" to waiting on a single future :) I haven't benchmarked this much, but if we see non-trivial overhead for the wait loop due to frequent wakeup-sleep cycles as we go through all of the futures, we could just have a ref-count around a single future couldn't we?
we are replacing some of the forking with the new support we introduced for I/O concurrency
❤️
Thanks for explaining I had not read your implementation carefully. I agree that we are doing less blocking than in the previous implementation of this, though we could still be blocking at times it seems? E.g. if you have two tasks and the first one takes more time? I understand what you are saying about reducing the overhead of forking, but I'm not too happy that it makes sizing thread pools more complicated in exchange? I need to think more about the trade-off.
E.g. if you have two tasks and the first one takes more time?
Right, that's a possible scenario, but unless we move to some kind of async API like the one you mentioned above, we'll always have blocking on the calling thread if there's a comparatively long running task running on one of the forked threads.
but I'm not too happy that it makes sizing thread pools more complicated in exchange? I need to think more about the trade-off.
To me it seem like the opposite is true, this changes makes reasoning about the sizing much easier. I find it very complicated working out the relative sizes of worker pool and coordinator pool. I effectively want the worker pool just sized right so that I get the CPU utilisation I desire without oversubscribing and adding scheduling overhead. Now I have to add a coordinator pool that enables that into the mix. That one I have to size in such a way that I always have another thread available as long as my worker pool isn't fully utilised. That's quite hard to get right?
With this change, I only have to size one pool and since blocking is rare I can probably ignore it in the calculation. So the size of the pool comes out to ~ desired_busy_cores / (1 - io_share_of_execution_time) doesn't it?
I took some time to digest the suggested code changes and the discussions above. I get the sizing issues with using two thread pools (one executing IndexSearcher#search or whatever operation that uses the executor and the other one provided as executor to the searcher) if the heavy work can be performed on either of the two pools. That would revert previous changes around offloading single slices, based on the requirement that we wanted to split the load between the two pools.
If this change though makes us revisit the need for two pools, and allows users to provide the same executor that search already executes against, I think that would be a really good simplification. I need to think more about downsides, and expectations around sizing: we may need bigger queues, because a single search operation may create many more tasks than before?
we may need bigger queues, because a single search operation may create many more tasks than before?
Right, an alternative would be to count in-progress searches at the top level and just make the queue unbounded? That would keep the behavior the same it is today and makes reasoning about the correct queue size simpler? Seems that's more of an ES than a Lucene concern though, Lucene should just make full use of the provided executor and that's that shouldn't it?
Lucene should just make full use of the provided executor and that's that shouldn't it?
Yes, I think so, but perhaps Lucene needs to provide general guidelines to users around what executor is suited and how it should be configured, what factors to take into account etc.
Would it make sense to provide a reference implementation factory method that creates a properly-configured threadpool, maybe using all available cores with whatever appropriate policies?
Lucene util benchmark results for this by running with one less thread for this branch vs main (credit to @jpountz and @javanna for the idea) to get an idea of the impact:
TaskQPS baseline StdDevQPS my_modified_version StdDev Pct diff p-value
Fuzzy1 105.06 (3.1%) 103.22 (3.6%) -1.7% ( -8% - 5%) 0.103
BrowseDayOfYearTaxoFacets 14.80 (1.0%) 14.55 (4.5%) -1.7% ( -7% - 3%) 0.096
OrHighMedDayTaxoFacets 6.60 (3.3%) 6.49 (2.1%) -1.6% ( -6% - 3%) 0.062
Respell 52.96 (2.2%) 52.56 (1.9%) -0.8% ( -4% - 3%) 0.243
BrowseDateTaxoFacets 14.91 (1.2%) 14.86 (3.9%) -0.4% ( -5% - 4%) 0.695
BrowseRandomLabelSSDVFacets 3.73 (0.5%) 3.73 (0.5%) 0.1% ( 0% - 1%) 0.714
BrowseMonthSSDVFacets 5.58 (2.0%) 5.59 (2.0%) 0.2% ( -3% - 4%) 0.763
BrowseDayOfYearSSDVFacets 7.61 (0.6%) 7.62 (0.6%) 0.2% ( 0% - 1%) 0.276
MedTermDayTaxoFacets 25.46 (0.7%) 25.52 (0.9%) 0.3% ( -1% - 1%) 0.328
AndHighHighDayTaxoFacets 15.24 (0.7%) 15.28 (0.5%) 0.3% ( -1% - 1%) 0.183
AndHighMedDayTaxoFacets 17.92 (0.7%) 17.99 (0.5%) 0.4% ( 0% - 1%) 0.023
BrowseRandomLabelTaxoFacets 11.95 (1.7%) 12.00 (1.2%) 0.4% ( -2% - 3%) 0.331
BrowseMonthTaxoFacets 12.37 (3.0%) 12.46 (1.7%) 0.7% ( -3% - 5%) 0.358
HighTermMonthSort 306.96 (16.4%) 309.25 (14.6%) 0.7% ( -26% - 38%) 0.879
BrowseDateSSDVFacets 1.45 (1.0%) 1.48 (2.4%) 1.7% ( -1% - 5%) 0.004
Prefix3 223.49 (31.2%) 228.83 (13.7%) 2.4% ( -32% - 68%) 0.754
Fuzzy2 55.36 (20.9%) 58.92 (14.4%) 6.4% ( -23% - 52%) 0.256
PKLookup 176.48 (18.1%) 194.13 (13.2%) 10.0% ( -17% - 50%) 0.045
OrNotHighLow 472.02 (2.4%) 567.48 (26.2%) 20.2% ( -8% - 50%) 0.001
HighSloppyPhrase 3.06 (3.6%) 3.69 (7.1%) 20.4% ( 9% - 32%) 0.000
AndHighLow 784.51 (24.4%) 959.85 (12.6%) 22.4% ( -11% - 78%) 0.000
Wildcard 124.97 (1.4%) 154.50 (2.5%) 23.6% ( 19% - 27%) 0.000
IntNRQ 70.70 (1.2%) 87.67 (4.0%) 24.0% ( 18% - 29%) 0.000
HighPhrase 94.06 (2.9%) 118.04 (5.3%) 25.5% ( 16% - 34%) 0.000
AndHighHigh 53.83 (1.5%) 67.85 (2.0%) 26.1% ( 22% - 30%) 0.000
LowSloppyPhrase 60.97 (2.4%) 77.49 (5.6%) 27.1% ( 18% - 35%) 0.000
LowPhrase 20.56 (1.2%) 26.27 (2.9%) 27.7% ( 23% - 32%) 0.000
MedPhrase 29.76 (1.7%) 39.75 (5.1%) 33.6% ( 26% - 40%) 0.000
LowIntervalsOrdered 15.55 (2.5%) 20.83 (4.1%) 33.9% ( 26% - 41%) 0.000
AndHighMed 99.55 (2.7%) 135.12 (2.1%) 35.7% ( 30% - 41%) 0.000
LowSpanNear 3.16 (1.8%) 4.30 (1.6%) 36.3% ( 32% - 40%) 0.000
OrHighMed 117.00 (3.8%) 164.78 (4.2%) 40.8% ( 31% - 50%) 0.000
OrHighNotHigh 89.87 (6.3%) 128.16 (36.4%) 42.6% ( 0% - 91%) 0.000
OrHighHigh 38.70 (1.8%) 55.41 (8.0%) 43.2% ( 32% - 53%) 0.000
MedSloppyPhrase 7.29 (3.5%) 10.68 (4.6%) 46.5% ( 37% - 56%) 0.000
HighSpanNear 2.54 (2.1%) 3.77 (3.2%) 48.6% ( 42% - 55%) 0.000
MedTerm 216.76 (15.6%) 324.89 (29.6%) 49.9% ( 4% - 112%) 0.000
HighTermTitleSort 13.92 (9.3%) 23.43 (8.9%) 68.3% ( 45% - 95%) 0.000
TermDTSort 68.68 (3.3%) 117.77 (12.2%) 71.5% ( 54% - 90%) 0.000
HighTerm 220.46 (5.7%) 396.67 (14.8%) 79.9% ( 56% - 106%) 0.000
OrHighLow 218.43 (26.1%) 400.99 (82.8%) 83.6% ( -20% - 260%) 0.000
HighTermTitleBDVSort 4.45 (2.1%) 8.32 (2.1%) 86.8% ( 80% - 92%) 0.000
MedSpanNear 22.62 (2.7%) 42.88 (5.8%) 89.6% ( 78% - 100%) 0.000
OrHighNotLow 329.64 (22.4%) 672.19 (30.0%) 103.9% ( 42% - 201%) 0.000
HighTermDayOfYearSort 57.50 (3.8%) 125.18 (9.8%) 117.7% ( 100% - 136%) 0.000
MedIntervalsOrdered 10.22 (4.1%) 22.48 (9.4%) 119.9% ( 102% - 139%) 0.000
HighIntervalsOrdered 2.41 (6.1%) 5.39 (10.2%) 123.3% ( 100% - 148%) 0.000
LowTerm 251.06 (10.8%) 634.45 (7.9%) 152.7% ( 120% - 192%) 0.000
OrNotHighMed 74.81 (5.4%) 221.54 (14.8%) 196.1% ( 166% - 228%) 0.000
OrNotHighHigh 95.65 (7.1%) 314.65 (21.1%) 228.9% ( 187% - 276%) 0.000
OrHighNotMed 59.11 (6.5%) 206.56 (15.0%) 249.4% ( 214% - 289%) 0.000
This is wikimediumall, 3 threads for main and 2 threads for this branch. Effectively no regressions but some considerable speedups.
The reason for this is the obvious reduction in context switching. We go from perf output for main:
Performance counter stats for process id '157418':
574,008,686,445 cycles
1,130,739,465,717 instructions # 1.97 insn per cycle
2,599,704,747 cache-misses
429,542 context-switches
49.053969801 seconds time elapsed
to this branch
Performance counter stats for process id '157292':
526,556,069,563 cycles
1,122,410,787,297 instructions # 2.13 insn per cycle
2,420,210,310 cache-misses
385,991 context-switches
41.044785986 seconds time elapsed
-> same number of instructions need to be executed pretty much, but they run in fewer cycles and encounter fewer cache misses.
This is also seen in the profile of where the CPU time goes:
main looks like this:
17.21% 328981 org.apache.lucene.search.TopScoreDocCollector$SimpleTopScoreDocCollector$1#collect()
5.75% 109925 org.apache.lucene.facet.sortedset.SortedSetDocValuesFacetCounts#countOneSegmentNHLD()
5.24% 100195 org.apache.lucene.search.TopFieldCollector$TopFieldLeafCollector#countHit()
5.17% 98733 org.apache.lucene.util.packed.DirectMonotonicReader#get()
4.11% 78637 org.apache.lucene.codecs.lucene90.Lucene90DocValuesProducer$20#ordValue()
3.98% 76164 org.apache.lucene.facet.taxonomy.FastTaxonomyFacetCounts#countAll()
2.57% 49115 org.apache.lucene.codecs.lucene99.Lucene99PostingsReader$EverythingEnum#nextPosition()
1.82% 34823 org.apache.lucene.queries.spans.NearSpansOrdered#stretchToOrder()
1.73% 33136 jdk.internal.foreign.MemorySessionImpl#checkValidStateRaw()
1.63% 31172 java.util.concurrent.atomic.AtomicLong#incrementAndGet()
while this branch looks as follows:
10.79% 183254 org.apache.lucene.search.TopScoreDocCollector$SimpleTopScoreDocCollector$1#collect()
5.89% 100099 org.apache.lucene.facet.sortedset.SortedSetDocValuesFacetCounts#countOneSegmentNHLD()
5.62% 95387 org.apache.lucene.util.packed.DirectMonotonicReader#get()
4.59% 77917 org.apache.lucene.codecs.lucene90.Lucene90DocValuesProducer$20#ordValue()
4.48% 76145 org.apache.lucene.facet.taxonomy.FastTaxonomyFacetCounts#countAll()
3.20% 54407 org.apache.lucene.search.TopFieldCollector$TopFieldLeafCollector#countHit()
2.77% 47088 org.apache.lucene.codecs.lucene99.Lucene99PostingsReader$EverythingEnum#nextPosition()
2.06% 34965 org.apache.lucene.queries.spans.NearSpansOrdered#stretchToOrder()
1.91% 32484 jdk.internal.foreign.MemorySessionImpl#checkValidStateRaw()
1.81% 30763 org.apache.lucene.codecs.lucene99.Lucene99PostingsReader$BlockImpactsPostingsEnum#advance()
1.71% 28966 org.apache.lucene.util.packed.DirectReader$DirectPackedReader12#get()
1.66% 28206 org.apache.lucene.codecs.lucene99.Lucene99PostingsReader$EverythingEnum#advance()
-> a lot less time goes into collect which goes through contended counter increments.
The luceneutil results reported here are astounding. So astounding I'm not sure I believe them? I wonder if somehow we did not run with concurrency enabled on the main branch test ... or if there was some other testing artifact? Part of my thinking is that if this change was so impactful, then wouldn't we have seen a huge regression when moving from the prior situation (where we ran N-1 tasks in the threadpool and one task on the main thread) to the current situation? Hmm I do see your comment that this is different since the "main" thread is continuing to do more tasks. Still I'm really surprised at the impact. If I can get a moment I'll try to corroborate the test result.
I had a similar reaction as you @msokolov. When it comes to lucene benchmarks, we only very recently started running those with concurrency enabled, so I think that previous changes to the concurrent code-path were not covered? In fact lucene-util did not provide the executor to the searcher until last week or so?
Part of my thinking is that if this change was so impactful, then wouldn't we have seen a huge regression when moving from the prior situation (where we ran N-1 tasks in the threadpool and one task on the main thread) to the current situation?
My general thinking is that there's a gain in using search concurrency in many cases, and some situations where it adds overhead or very little gain. At least when it comes to what we have observed in Elasticsearch benchmarks, we've so far mostly compared no concurrency with some concurrency, and not accurately observed differences between different attempts at running tasks concurrently: the root issue was the overhead of forking, and running no tasks in the caller thread vs one probably did not make a whole lot of difference. I think that the proposed solution is very different and that is why we are observing very different benchmark results. This change seems to take the concurrent code-path to a different level.
With that, please do double check the bench results, more eyes on it can only help.
@shubhamvishu @reta @sohami @mikemccand @zhaih I am pinging you folks because you have previously been involved in some of the search concurrency discussions and this PR will affect how search concurrency is exposed to users.
Elasticsearch leverages inter-segment search concurrency in Lucene by providing an executor to the IndexSearcher. Such executor is so far separate from the one that calls IndexSearcher#search: there is a search thread pool as well as a search workers thread pool in Elasticsearch. This introduces some complexity for Lucene users, as they need to size a separate executor and provide it in order to benefit from search concurrency. We have seen that the caller thread is under-utilized and could happily help execute tasks too, while it currently blocks and waits for all search tasks to complete. @original-brownbear implemented a deadlock free solution to this, that allows the caller thread to offload tasks to the executor and at the same time pull tasks while they have not yet been executed. This reduces forking and optimizes concurreunt execution. Additionally, this makes it possible to execute all of the tasks on a single executor, removing the requirements that users size and provide a separate executor to the searcher. I think this is great both from the perspective of better resource utilization as well as how easier it gets to leverage concurrency in Lucene.
What do you all think?
@msokolov they are astounding but in the opposite direction, in fact it's concurrency that's the problem mostly.
This is main vs main, no concurrency vs 4 threads:
TaskQPS baseline StdDevQPS my_modified_version StdDev Pct diff p-value
BrowseDayOfYearTaxoFacets 14.81 (0.4%) 5.97 (0.4%) -59.7% ( -60% - -59%) 0.000
BrowseDateTaxoFacets 14.20 (9.0%) 5.85 (0.2%) -58.8% ( -62% - -54%) 0.000
IntNRQ 70.46 (1.3%) 30.29 (3.2%) -57.0% ( -60% - -53%) 0.000
BrowseRandomLabelTaxoFacets 11.61 (2.7%) 5.08 (0.3%) -56.3% ( -57% - -54%) 0.000
Fuzzy1 72.82 (5.7%) 44.58 (1.1%) -38.8% ( -43% - -33%) 0.000
BrowseDayOfYearSSDVFacets 7.66 (1.0%) 4.78 (0.6%) -37.6% ( -38% - -36%) 0.000
OrHighMed 74.56 (2.4%) 51.86 (3.2%) -30.4% ( -35% - -25%) 0.000
AndHighHigh 47.99 (2.7%) 34.33 (2.6%) -28.5% ( -32% - -23%) 0.000
AndHighMed 67.95 (1.5%) 52.17 (2.4%) -23.2% ( -26% - -19%) 0.000
LowSloppyPhrase 45.51 (1.7%) 37.92 (2.4%) -16.7% ( -20% - -12%) 0.000
MedPhrase 11.68 (5.0%) 9.74 (0.3%) -16.6% ( -20% - -11%) 0.000
BrowseMonthTaxoFacets 12.23 (2.6%) 10.73 (27.7%) -12.2% ( -41% - 18%) 0.378
OrHighHigh 45.32 (2.7%) 39.79 (4.2%) -12.2% ( -18% - -5%) 0.000
BrowseMonthSSDVFacets 5.49 (4.2%) 4.85 (1.1%) -11.7% ( -16% - -6%) 0.000
HighSloppyPhrase 2.01 (2.2%) 1.81 (7.4%) -10.2% ( -19% - 0%) 0.008
Wildcard 123.17 (2.5%) 115.43 (0.9%) -6.3% ( -9% - -2%) 0.000
OrNotHighLow 908.00 (2.2%) 865.22 (1.4%) -4.7% ( -8% - -1%) 0.000
LowIntervalsOrdered 57.32 (3.2%) 54.78 (3.9%) -4.4% ( -11% - 2%) 0.077
MedTermDayTaxoFacets 22.22 (0.6%) 21.57 (2.9%) -2.9% ( -6% - 0%) 0.049
BrowseDateSSDVFacets 1.46 (2.0%) 1.45 (2.1%) -0.5% ( -4% - 3%) 0.743
BrowseRandomLabelSSDVFacets 3.75 (0.6%) 3.74 (0.2%) -0.2% ( -1% - 0%) 0.551
OrHighMedDayTaxoFacets 1.20 (1.1%) 1.21 (4.4%) 0.9% ( -4% - 6%) 0.678
Respell 52.55 (1.4%) 53.25 (2.9%) 1.3% ( -2% - 5%) 0.407
AndHighMedDayTaxoFacets 11.46 (0.8%) 11.76 (2.7%) 2.6% ( 0% - 6%) 0.067
AndHighHighDayTaxoFacets 12.74 (1.3%) 13.23 (2.1%) 3.8% ( 0% - 7%) 0.002
MedSpanNear 8.28 (2.4%) 9.50 (5.0%) 14.7% ( 7% - 22%) 0.000
AndHighLow 624.28 (22.4%) 726.83 (3.6%) 16.4% ( -7% - 54%) 0.147
Fuzzy2 51.95 (23.2%) 60.73 (2.7%) 16.9% ( -7% - 55%) 0.147
MedSloppyPhrase 12.94 (4.1%) 15.57 (10.9%) 20.3% ( 5% - 36%) 0.001
Prefix3 158.65 (23.1%) 213.31 (3.8%) 34.5% ( 6% - 79%) 0.003
PKLookup 175.73 (6.3%) 247.50 (0.6%) 40.8% ( 31% - 50%) 0.000
HighPhrase 24.79 (6.3%) 37.67 (1.4%) 52.0% ( 41% - 63%) 0.000
LowPhrase 153.31 (1.4%) 244.54 (1.6%) 59.5% ( 55% - 63%) 0.000
OrHighLow 232.73 (23.8%) 371.84 (4.1%) 59.8% ( 25% - 115%) 0.000
HighSpanNear 2.93 (3.5%) 4.82 (13.3%) 64.7% ( 46% - 84%) 0.000
LowSpanNear 51.65 (6.0%) 98.03 (9.8%) 89.8% ( 69% - 112%) 0.000
HighTermTitleBDVSort 4.37 (4.4%) 8.65 (1.6%) 98.0% ( 88% - 108%) 0.000
MedIntervalsOrdered 9.46 (7.3%) 19.51 (13.2%) 106.1% ( 79% - 136%) 0.000
HighIntervalsOrdered 4.26 (6.5%) 8.81 (13.6%) 106.9% ( 81% - 135%) 0.000
LowTerm 232.68 (3.8%) 485.59 (7.5%) 108.7% ( 93% - 124%) 0.000
MedTerm 202.48 (26.4%) 535.61 (18.8%) 164.5% ( 94% - 285%) 0.000
OrHighNotLow 172.52 (3.4%) 516.56 (7.3%) 199.4% ( 182% - 217%) 0.000
OrNotHighHigh 69.11 (4.1%) 224.30 (11.7%) 224.6% ( 200% - 250%) 0.000
OrHighNotHigh 77.32 (2.7%) 271.59 (12.7%) 251.3% ( 229% - 274%) 0.000
TermDTSort 62.88 (4.5%) 224.63 (5.5%) 257.2% ( 236% - 279%) 0.000
HighTerm 106.32 (3.1%) 385.12 (25.1%) 262.2% ( 227% - 299%) 0.000
OrNotHighMed 64.14 (10.1%) 247.41 (19.2%) 285.7% ( 232% - 350%) 0.000
OrHighNotMed 78.41 (5.3%) 306.67 (10.6%) 291.1% ( 261% - 324%) 0.000
HighTermMonthSort 395.36 (38.7%) 2712.69 (16.2%) 586.1% ( 382% - 1046%) 0.000
HighTermDayOfYearSort 67.77 (4.8%) 524.03 (18.3%) 673.3% ( 620% - 731%) 0.000
HighTermTitleSort 15.06 (3.6%) 131.50 (5.9%) 773.4% ( 737% - 811%) 0.000
A large number of these items are actually showing extreme regressions from forking. Even this branch is like 50% behind no concurrency on some points. This is in fact how I got to opening this PR.
When profiling ES benchmark runs I saw a bunch of sections where the overhead of forking for a given task was higher than the cost of just executing that same task right away. It's a little hard to quantitatively show this in a flame graph but the qualitative problem is here: This is the profiling with vanilla Lucene:
And this is the same situation with my changes in Lucene:
For weight creation, the forking overhead is still overwhelming but at least we save the future.get overhead from putting the calling thread to sleep and waking it up again. Only for longer running search tasks is the forking overhead "ok" I think. As I tried to show with the perf output, the cache effects of context switching often outweigh any benefits of parallization of IO. I could even see a point where the IO parallization causes harm, not from the IO itself but from the fact that page faulting isn't super scalable in Linux, so even if you make an NVMe drive run faster, the contention on the page fault handling might actually destroy any benefit from pushing the disk (assuming a fast disk that is) harder.
Additionally, this makes it possible to execute all of the tasks on a single executor, removing the requirements that users size and provide a separate executor to the searcher. I think this is great both from the perspective of better resource utilization as well as how easier it gets to leverage concurrency in Lucene.
Thanks @jpountz, this change is quite clever, and I agree with @original-brownbear that it leads to better CPU utilization since the caller thread "joins" the executor pool in the effort (also confirmed by benchmarks). May be I am missing something, but the implementation basically introduces "double queuing": task executor has one and very likely the supplied executor would have one, both are competing over taskId (directly or indirectly) to get something done.
On the general note, it resembles a lot the way ForkJoinPool is implemented (which is also utilizes the calling thread to schedule some work), may be we could explore this route as well? I mean have a specialized task executor that accepts ForkJoinPool, not a general executor, I think it could simply things quite a bit, just an idea.
May be I am missing something, but the implementation basically introduces "double queuing": task executor has one and very likely the supplied executor would have one, both are competing over taskId (directly or indirectly) to get something done.
You're right this is one of the remaining areas of contention that could be fixed for even better performance. Using a ForkJoinPool also has some potential for reducing the impact of memory barriers.
I wonder if this is the right area to optimize though? Looking at the results of concurrency vs. no-concurrency in https://github.com/apache/lucene/pull/13472#issuecomment-2173609575 I'm inclined to think this is not the code we should optimize further. Even with this change we're at times 2x-3x (vs. 8-9x without my changes) slower than main without concurrency for some parts of the benchmark.
I don't think we can eliminate all the overhead of requiring some memory barriers for synchronising tasks. So maybe the problem eventually just is with tasks that are too small?
I don't think we can eliminate all the overhead of requiring some memory barriers for synchronising tasks. So maybe the problem eventually just is with tasks that are too small?
This change is definitely an improvement (over the existing implementation)
I don't think we can eliminate all the overhead of requiring some memory barriers for synchronising tasks. So maybe the problem eventually just is with tasks that are too small?
Eliminate - probably not, but reduce further - likely but it needs yet to be proven (as you rightly pointed out, maybe the problem eventually just is with tasks that are too small?, so optimizing further won't help).
We need to be careful interpreting the QPS results from luceneutil:
These are not actual red-line (capacity) QPS numbers (CPU is not normally saturated during these runs), but rather "effective QPS". The benchy measures "t = median elapsed wall-clock time to run the query (after discarding 10% slowest outliers and warmup), averaged across all JVMs for each task" and computes and reports the effective QPS (1.0 / t). E.g. this means that if a run is using more concurrent threads it may finish (wall clock elapsed time) faster and appear to have more QPS, but that is false since this is net/net a zero-sum-game (still burning more/same-ish total CPU, just spread across more threads).
Except, if Lucene is actually more efficient (lower total CPU) by running more threads at once because e.g. collecting the best hits across more segments concurrently means we can stop searching the non-competitive docs across segments sooner, then that is an actual QPS (red-line / capacity) win, not just a reporting artifact. With time/improvements this should in fact be a big contributor to more efficient (total CPU) search... I think much innovation remains on this part of Lucene :) E.g. maybe some segments tend to contribute strongly to most queries, and if Lucene could instrument/record this, it should kick off those segments first so they quickly find the competitive docs and quickly cause the other segments to stop early. Intra-segment concurrency is another innovation we have yet to successfully tackle ... explicit early termination (not just BMW) with all this concurrency is more exploration ...
But, in your first run, since you ran main with three threads and branch with two threads, I think the concurrency is the same (three threads actually searching slices?) so this artifact of luceneutil QPS reporting can't explain the gains in that run ... but it's hard to believe context switching is so costly? Hmm, though your profiling seems to show far fewer calls to collect, which might mean cross-segment concurrent efficiency is somehow kicking in? Or, if context switching really explains it all, and collect is much faster, it would be sampled less ... can't read too much into the sample count from profiler output.
This is main vs main, no concurrency vs 4 threads:
So for these results, I would expect to see ~4X QPS gain (ish) simply because wall-clock elapsed time for the query got ~4X faster (assuming perfect concurrency which won't happen in practice ... it depends of course on relative size of segments, whether there is long-pole outlier task limiting the actual concurrency, etc.). So, it's interesting some tasks are > 400% faster -- maybe the added cross-segment concurrent efficiency is contributing here?
It's also spooky that some tasks got ~2X slower. Can this really be due to context switching overhead? Nightly benchmarks did indeed switch to "concurrent" search but with only 1 worker thread, just to exercise that code path without actually using concurrency. Yet we didn't see similar slowdowns to the BrowseXXX facet tasks as above, e.g. BrowseDateTaxoFacets. So the slowdown doesn't happen just because we are exercising the concurrent path. Actually, I don't think the BrowseXXX tasks even use concurrency at all: if you look at SearchTask.java it's just calling FastTaxonomyFacetCounts on the whole index which runs sequentially segment by segment I think? So now I don't understand why this main vs main (4 threads) is showing any slowdown for these BrowseXXX tasks... confused.
[I'll open a spinoff issue here that Lucene's facet counting should also maybe tap into this executor for concurrent counting. It's likely tricky though...].
@original-brownbear -- what does your Lucene index look like? Can you run CheckIndex and share the output on your runs? I'm curious about the actual segment geometry... also make sure you are sharing the same single index across main and branch.
I'll open a spinoff issue here that Lucene's facet counting should also maybe tap into this executor for concurrent counting
#12474 sounds related.
So for these results, I would expect to see ~4X QPS gain (ish) simply because wall-clock elapsed time for the query got ~4X faster
That's why I added the perf numbers :) We already lose about 10% for instructions/cycle already + need to deal with context switches that take a rather unpredictable amount of time.
These are not actual red-line (capacity) QPS numbers (CPU is not normally saturated during these runs), but rather "effective QPS".
Jup that's my point in all of this to some extent :) we're not saturated but perf and wall-clock time numbers taken together show we often burn more CPU for no speedup whatsoever. The number of executed CPU instructions is the same across runs pretty much, just takes much longer to get through them because we have the stalls from context switching. It's also not that we're just paying for context switching here. We're also waking up threads in some cases which is mostly what shows as costly in profiling.
Or, if context switching really explains it all, and collect is much faster, it would be sampled less
Yes, the CAS operations in collect get much much faster as far as I can tell. Note that the it's not even blind CAS here. So we do read -> cas and then later another read on the same number in many cases. That's a huge cost.
Check index says this:
0.00% total deletions; 33332620 documents; 0 deletions
Segments file=segments_2 numSegments=15 version=10.0.0 id=adt5pnf860s3oddr3g83e6jp3 userData={userData=multi}
1 of 15: name=_h2 maxDoc=56399
version=10.0.0
id=adt5pnf860s3oddr3g83e6jnh
codec=Lucene99
compound=false
numFiles=17
size (MB)=34.555
diagnostics = {source=flush, lucene.version=10.0.0, os.version=5.15.0-112-generic, os.arch=amd64, java.vendor=Ubuntu, os=Linux, java.runtime.version=21.0.3+9-Ubuntu-1ubuntu122.04.1, timestamp=1718716766205}
no deletions
test: open reader.........OK [took 0.045 sec]
test: check integrity.....OK [took 0.027 sec]
2 of 15: name=_h1 maxDoc=60066
version=10.0.0
id=adt5pnf860s3oddr3g83e6jlw
codec=Lucene99
compound=false
numFiles=17
size (MB)=37.469
diagnostics = {source=flush, lucene.version=10.0.0, os.version=5.15.0-112-generic, os.arch=amd64, java.vendor=Ubuntu, os=Linux, java.runtime.version=21.0.3+9-Ubuntu-1ubuntu122.04.1, timestamp=1718716761312}
no deletions
test: open reader.........OK [took 0.043 sec]
test: check integrity.....OK [took 0.033 sec]
3 of 15: name=_h0 maxDoc=60065
version=10.0.0
id=adt5pnf860s3oddr3g83e6jkb
codec=Lucene99
compound=false
numFiles=17
size (MB)=38.403
diagnostics = {source=flush, lucene.version=10.0.0, os.version=5.15.0-112-generic, os.arch=amd64, java.vendor=Ubuntu, os=Linux, java.runtime.version=21.0.3+9-Ubuntu-1ubuntu122.04.1, timestamp=1718716755990}
no deletions
test: open reader.........OK [took 0.043 sec]
test: check integrity.....OK [took 0.033 sec]
4 of 15: name=_gz maxDoc=60065
version=10.0.0
id=adt5pnf860s3oddr3g83e6jit
codec=Lucene99
compound=false
numFiles=17
size (MB)=44.224
diagnostics = {source=flush, lucene.version=10.0.0, os.version=5.15.0-112-generic, os.arch=amd64, java.vendor=Ubuntu, os=Linux, java.runtime.version=21.0.3+9-Ubuntu-1ubuntu122.04.1, timestamp=1718716750529}
no deletions
test: open reader.........OK [took 0.046 sec]
test: check integrity.....OK [took 0.040 sec]
5 of 15: name=_gy maxDoc=60066
version=10.0.0
id=adt5pnf860s3oddr3g83e6jhb
codec=Lucene99
compound=false
numFiles=17
size (MB)=50.305
diagnostics = {source=flush, lucene.version=10.0.0, os.version=5.15.0-112-generic, os.arch=amd64, java.vendor=Ubuntu, os=Linux, java.runtime.version=21.0.3+9-Ubuntu-1ubuntu122.04.1, timestamp=1718716744376}
no deletions
test: open reader.........OK [took 0.044 sec]
test: check integrity.....OK [took 0.041 sec]
6 of 15: name=_gm maxDoc=600653
version=10.0.0
id=adt5pnf860s3oddr3g83e6j15
codec=Lucene99
compound=false
numFiles=17
size (MB)=349.349
diagnostics = {java.runtime.version=21.0.3+9-Ubuntu-1ubuntu122.04.1, mergeFactor=10, java.vendor=Ubuntu, os=Linux, os.version=5.15.0-112-generic, timestamp=1718716660478, mergeMaxNumSegments=-1, lucene.version=10.0.0, source=merge, os.arch=amd64}
no deletions
test: open reader.........OK [took 0.052 sec]
test: check integrity.....OK [took 0.174 sec]
7 of 15: name=_gb maxDoc=600654
version=10.0.0
id=adt5pnf860s3oddr3g83e6ikk
codec=Lucene99
compound=false
numFiles=17
size (MB)=356.852
diagnostics = {java.runtime.version=21.0.3+9-Ubuntu-1ubuntu122.04.1, mergeFactor=10, java.vendor=Ubuntu, os=Linux, os.version=5.15.0-112-generic, timestamp=1718716598071, mergeMaxNumSegments=-1, lucene.version=10.0.0, source=merge, os.arch=amd64}
no deletions
test: open reader.........OK [took 0.044 sec]
test: check integrity.....OK [took 0.180 sec]
8 of 15: name=_g0 maxDoc=600654
version=10.0.0
id=adt5pnf860s3oddr3g83e6i3z
codec=Lucene99
compound=false
numFiles=17
size (MB)=375.264
diagnostics = {java.runtime.version=21.0.3+9-Ubuntu-1ubuntu122.04.1, mergeFactor=10, java.vendor=Ubuntu, os=Linux, os.version=5.15.0-112-generic, timestamp=1718716534641, mergeMaxNumSegments=-1, lucene.version=10.0.0, source=merge, os.arch=amd64}
no deletions
test: open reader.........OK [took 0.044 sec]
test: check integrity.....OK [took 0.164 sec]
9 of 15: name=_fp maxDoc=600654
version=10.0.0
id=adt5pnf860s3oddr3g83e6hmz
codec=Lucene99
compound=false
numFiles=17
size (MB)=387.276
diagnostics = {java.runtime.version=21.0.3+9-Ubuntu-1ubuntu122.04.1, mergeFactor=10, java.vendor=Ubuntu, os=Linux, os.version=5.15.0-112-generic, timestamp=1718716464813, mergeMaxNumSegments=-1, lucene.version=10.0.0, source=merge, os.arch=amd64}
no deletions
test: open reader.........OK [took 0.067 sec]
test: check integrity.....OK [took 0.182 sec]
10 of 15: name=_gx maxDoc=600654
version=10.0.0
id=adt5pnf860s3oddr3g83e6jh8
codec=Lucene99
compound=false
numFiles=17
size (MB)=388.355
diagnostics = {java.runtime.version=21.0.3+9-Ubuntu-1ubuntu122.04.1, mergeFactor=10, java.vendor=Ubuntu, os=Linux, os.version=5.15.0-112-generic, timestamp=1718716728124, mergeMaxNumSegments=-1, lucene.version=10.0.0, source=merge, os.arch=amd64}
no deletions
test: open reader.........OK [took 0.044 sec]
test: check integrity.....OK [took 0.154 sec]
11 of 15: name=_cb maxDoc=6006538
version=10.0.0
id=adt5pnf860s3oddr3g83e6bpn
codec=Lucene99
compound=false
numFiles=17
size (MB)=3,224.495
diagnostics = {java.runtime.version=21.0.3+9-Ubuntu-1ubuntu122.04.1, mergeFactor=10, java.vendor=Ubuntu, os=Linux, os.version=5.15.0-112-generic, timestamp=1718715629725, mergeMaxNumSegments=-1, lucene.version=10.0.0, source=merge, os.arch=amd64}
no deletions
test: open reader.........OK [took 0.056 sec]
test: check integrity.....OK [took 0.940 sec]
12 of 15: name=_fe maxDoc=6006538
version=10.0.0
id=adt5pnf860s3oddr3g83e6h4h
codec=Lucene99
compound=false
numFiles=17
size (MB)=3,245.9
diagnostics = {java.runtime.version=21.0.3+9-Ubuntu-1ubuntu122.04.1, mergeFactor=10, java.vendor=Ubuntu, os=Linux, os.version=5.15.0-112-generic, timestamp=1718716334512, mergeMaxNumSegments=-1, lucene.version=10.0.0, source=merge, os.arch=amd64}
no deletions
test: open reader.........OK [took 0.044 sec]
test: check integrity.....OK [took 0.790 sec]
13 of 15: name=_98 maxDoc=6006538
version=10.0.0
id=adt5pnf860s3oddr3g83e663z
codec=Lucene99
compound=false
numFiles=17
size (MB)=3,248.36
diagnostics = {java.runtime.version=21.0.3+9-Ubuntu-1ubuntu122.04.1, mergeFactor=10, java.vendor=Ubuntu, os=Linux, os.version=5.15.0-112-generic, timestamp=1718714905816, mergeMaxNumSegments=-1, lucene.version=10.0.0, source=merge, os.arch=amd64}
no deletions
test: open reader.........OK [took 0.046 sec]
test: check integrity.....OK [took 0.805 sec]
14 of 15: name=_65 maxDoc=6006538
version=10.0.0
id=adt5pnf860s3oddr3g83e60a5
codec=Lucene99
compound=false
numFiles=17
size (MB)=3,312.925
diagnostics = {java.runtime.version=21.0.3+9-Ubuntu-1ubuntu122.04.1, mergeFactor=10, java.vendor=Ubuntu, os=Linux, os.version=5.15.0-112-generic, timestamp=1718714177660, mergeMaxNumSegments=-1, lucene.version=10.0.0, source=merge, os.arch=amd64}
no deletions
test: open reader.........OK [took 0.046 sec]
test: check integrity.....OK [took 1.095 sec]
15 of 15: name=_32 maxDoc=6006538
version=10.0.0
id=adt5pnf860s3oddr3g83e5u7k
codec=Lucene99
compound=false
numFiles=17
size (MB)=3,385.773
diagnostics = {java.runtime.version=21.0.3+9-Ubuntu-1ubuntu122.04.1, mergeFactor=10, java.vendor=Ubuntu, os=Linux, os.version=5.15.0-112-generic, timestamp=1718713437198, mergeMaxNumSegments=-1, lucene.version=10.0.0, source=merge, os.arch=amd64}
no deletions
test: open reader.........OK [took 0.044 sec]
test: check integrity.....OK [took 0.955 sec]
I reran the benchmarks of no concurrency vs 4 threads and constrained the page cache a lot by setting -Xmx to almost all of the machines memory (page cache size goes to about 1.5G). As somewhat expected, in this scenario concurrency helps a lot.
TaskQPS baseline StdDevQPS my_modified_version StdDev Pct diff p-value
PKLookup 85.93 (34.3%) 3.99 (1.4%) -95.4% ( -97% - -90%) 0.000
Fuzzy1 101.17 (2.9%) 8.18 (8.3%) -91.9% (-100% - -83%) 0.000
AndHighLow 1044.32 (2.5%) 85.54 (0.2%) -91.8% ( -92% - -91%) 0.000
Fuzzy2 47.18 (16.5%) 4.91 (0.9%) -89.6% ( -91% - -86%) 0.000
LowSloppyPhrase 45.24 (3.3%) 4.85 (0.3%) -89.3% ( -89% - -88%) 0.000
MedPhrase 40.91 (1.6%) 5.12 (2.2%) -87.5% ( -89% - -85%) 0.000
HighPhrase 87.72 (1.4%) 11.00 (0.1%) -87.5% ( -87% - -87%) 0.000
HighTermTitleBDVSort 13.88 (1.6%) 1.78 (0.1%) -87.2% ( -87% - -86%) 0.000
OrNotHighLow 1688.98 (6.8%) 221.97 (15.1%) -86.9% (-101% - -69%) 0.000
Prefix3 557.04 (17.4%) 73.35 (2.6%) -86.8% ( -90% - -80%) 0.000
Wildcard 30.67 (7.3%) 4.55 (3.8%) -85.2% ( -89% - -79%) 0.000
Respell 32.29 (40.9%) 4.90 (14.7%) -84.8% ( -99% - -49%) 0.000
LowSpanNear 25.42 (0.8%) 3.98 (2.7%) -84.4% ( -87% - -81%) 0.000
MedSloppyPhrase 57.73 (2.6%) 9.15 (1.3%) -84.2% ( -85% - -82%) 0.000
LowPhrase 169.09 (0.7%) 28.85 (13.2%) -82.9% ( -96% - -69%) 0.000
LowIntervalsOrdered 14.27 (0.8%) 2.44 (3.6%) -82.9% ( -86% - -79%) 0.000
IntNRQ 97.33 (2.6%) 18.62 (0.9%) -80.9% ( -82% - -79%) 0.000
MedSpanNear 27.71 (0.8%) 5.96 (4.1%) -78.5% ( -82% - -74%) 0.000
MedIntervalsOrdered 5.63 (0.9%) 1.49 (5.1%) -73.6% ( -78% - -68%) 0.000
HighIntervalsOrdered 4.11 (1.0%) 1.09 (0.9%) -73.6% ( -74% - -72%) 0.000
HighSloppyPhrase 16.75 (1.8%) 4.68 (2.3%) -72.1% ( -74% - -69%) 0.000
HighSpanNear 7.44 (1.3%) 2.13 (2.8%) -71.3% ( -74% - -68%) 0.000
OrNotHighMed 278.73 (2.2%) 85.43 (2.8%) -69.4% ( -72% - -65%) 0.000
BrowseDateTaxoFacets 15.30 (2.3%) 4.80 (1.7%) -68.6% ( -71% - -66%) 0.000
BrowseDayOfYearTaxoFacets 14.91 (11.4%) 5.52 (2.4%) -62.9% ( -68% - -55%) 0.000
BrowseRandomLabelTaxoFacets 11.38 (0.5%) 4.28 (1.5%) -62.4% ( -64% - -60%) 0.000
OrHighMed 129.46 (4.2%) 48.97 (8.7%) -62.2% ( -72% - -51%) 0.000
AndHighHigh 65.35 (1.0%) 25.98 (6.2%) -60.2% ( -66% - -53%) 0.000
AndHighMed 76.79 (1.3%) 31.63 (9.5%) -58.8% ( -68% - -48%) 0.000
OrHighHigh 42.48 (14.7%) 18.32 (1.5%) -56.9% ( -63% - -47%) 0.000
OrHighNotLow 144.92 (1.7%) 64.99 (0.8%) -55.2% ( -56% - -53%) 0.000
MedTerm 284.66 (25.9%) 152.10 (3.4%) -46.6% ( -60% - -23%) 0.000
AndHighMedDayTaxoFacets 108.26 (2.1%) 63.19 (2.1%) -41.6% ( -44% - -38%) 0.000
BrowseDayOfYearSSDVFacets 7.91 (5.5%) 4.67 (1.5%) -40.9% ( -45% - -35%) 0.000
OrNotHighHigh 155.25 (9.7%) 93.97 (2.8%) -39.5% ( -47% - -29%) 0.000
OrHighLow 176.36 (55.0%) 123.29 (59.2%) -30.1% ( -93% - 186%) 0.405
BrowseDateSSDVFacets 1.53 (2.5%) 1.27 (8.8%) -17.3% ( -27% - -6%) 0.000
MedTermDayTaxoFacets 21.73 (1.6%) 18.02 (2.8%) -17.0% ( -21% - -12%) 0.000
LowTerm 279.50 (24.2%) 234.75 (14.9%) -16.0% ( -44% - 30%) 0.208
OrHighNotHigh 70.74 (25.6%) 60.08 (54.8%) -15.1% ( -76% - 87%) 0.578
HighTermTitleSort 23.23 (1.5%) 19.88 (16.3%) -14.4% ( -31% - 3%) 0.048
AndHighHighDayTaxoFacets 7.27 (1.3%) 6.80 (4.5%) -6.5% ( -12% - 0%) 0.002
BrowseMonthTaxoFacets 12.85 (24.3%) 12.34 (28.6%) -4.0% ( -45% - 64%) 0.813
BrowseMonthSSDVFacets 5.17 (1.7%) 4.99 (2.2%) -3.4% ( -7% - 0%) 0.007
BrowseRandomLabelSSDVFacets 3.72 (0.5%) 3.81 (14.4%) 2.5% ( -12% - 17%) 0.696
OrHighMedDayTaxoFacets 4.74 (1.7%) 4.89 (4.6%) 3.3% ( -2% - 9%) 0.134
HighTermMonthSort 279.05 (33.6%) 307.51 (140.4%) 10.2% (-122% - 277%) 0.874
OrHighNotMed 120.83 (1.2%) 154.43 (84.5%) 27.8% ( -57% - 114%) 0.462
TermDTSort 59.06 (4.9%) 81.90 (63.8%) 38.7% ( -28% - 113%) 0.177
HighTermDayOfYearSort 55.42 (41.8%) 77.40 (19.2%) 39.7% ( -15% - 173%) 0.054
HighTerm 124.75 (7.0%) 496.22 (33.9%) 297.8% ( 239% - 364%) 0.000
(left is 4 threads, right is no concurrency)
Seems to me CPU bound scenarios don't parallelise well and cache misses just introduce bottle-necks that more than outweigh the benefits of speeding up the parallel sections of a search, but as soon as significant disk IO comes into play the situation reverses as one would expect. Who cares about stalling on RAM reads while stalling on disk frequently I guess? :)
I'm just catching up with the comments and progress here. This is indeed exciting and able to better utilize the CPU by reducing the overhead. I also like the fact that it handles the deadlock issue by default. +1 to the idea of using ForkJoinPool with IndexSearcher, which has also popped up in the past as well(ref comment), so that maybe something we would like to have for the cases where concurrency is actually helping (happy to spend some time looking into that front), but this is a good step in improving the concurrency. Though I'm not super sure about the results posted here, which show crazy massive gains but it seems like there some cases where concurrency is just not helping and adding a lot of overhead if I understand correctly @original-brownbear?
Though I'm not super sure about the results posted here, which show crazy massive gains but it seems like there some cases where concurrency is just not helping and adding a lot of overhead if I understand correctly
@shubhamvishu
Yes, there's cases where concurrency helps but it adds overhead in all cases as well. In a nutshell:
- If you're IO bound, concurrency helps up to fairly high levels of concurrency and thread count can be almost proportional to QPS. SSDs+Linux together are pretty good at scheduling IO + compute. Put differently, we are parallelising slow tasks so the overhead of parallelizing matters less.
- But, if there's no IO bound (e.g. everything is in the page cache already), we're parallelising short running tasks in most cases, the overhead of parallelising matters more.
Just catching up with the thread. Have few thoughts on this:
but I'm not too happy that it makes sizing thread pools more complicated in exchange? I need to think more about the trade-off.
To me it seem like the opposite is true, this changes makes reasoning about the sizing much easier. I find it very complicated working out the relative sizes of worker pool and coordinator pool. I effectively want the worker pool just sized right so that I get the CPU utilisation I desire without oversubscribing and adding scheduling overhead.
From sizing perspective, the CPU utilization will be dependent on the active threads working for the search request. In current way, all the threads will come from worker pool vs coordinator will wait for completion and later perform the reduce operation. Whereas with this change we are saying the active threads for a request will be coming from both coordinator (or caller thread) and worker pool. So we should ideally get the same utilization if we size the worker pool to be 1 + coordinator pool (not considering the scheduling overhead). In systems like ES or OS, probably it is easier to configure these 2 pools in that way because coordinator pool will control how many searches are concurrently being processed and worker pool will determine how each of these searches are parallelized based on the slice count and worker pool size. For example: If I want only 2 searches to be processed at a time and each of those searches want to limit the parallelization to 2 slices. Then I can configure my coordinator pool with 2 threads and worker with 4 threads to keep the utilization as needed.
I am curious about the first test run which compares 3 threads in main vs 2 threads in this branch. If I understand correctly it looks like the main benefit is coming from the context switching overhead. Is this overhead because of waking the coordinator thread for each of the worker thread completion ? So for example if we have 12 tasks to process, in main, it will probably signal the coordinator thread 12 times (in worst case, if each of tasks completes one after another) vs in this branch the coordinator thread may get signaled only 2 times. If that is the case, that means if we have some sort of grouping of futures and invoke the coordinator only when all of those completes, we should be same or better off than this change ?
Is this overhead because of waking the coordinator thread for each of the worker thread completion ?
It's both the waking of the coordinator thread and potentially the waking of the worker thread with thread waking notoriously slow and unpredictable (in Java? in Linux? idk, actually, but the profiling and the docs for ForkJoinPool say so).
In systems like ES or OS, probably it is easier to configure these 2 pools in that way because coordinator pool will control how many searches are concurrently being processed and worker pool will determine how each of these searches are parallelized based on the slice count and worker pool size.
But now the coordinator pool and the worker pool both intermittently do compute heavy work, potentially at the same time. Sizing this relative to CPUs in an efficient way is fairly complicated and even if it works and you get 100% CPU usage, you still have the problem of constantly waking up threads which is slow. Also see the point at the bottom about fanning out recursively.
So for example if we have 12 tasks to process, in main, it will probably signal the coordinator thread 12 times (in worst case, if each of tasks completes one after another) vs in this branch the coordinator thread may get signaled only 2 times.
It depends on the task distribution obviously, but empirically it seems the coordinator/calling-thread will often outpace the other threads as far as grabbing a task goes and never get blocked on the future(s) in any way. Assuming you have any queuing on the pool already it's pretty much a given that this happens. The beauty here is that if the pool is already running at full capacity the calling thread will just work through all the tasks on its own without the threads ever sleeping.
If that is the case, that means if we have some sort of grouping of futures and invoke the coordinator only when all of those completes, we should be same or better off than this change ?
It would be better than what we have today but you'll still have the unnecessary context switches. Likely you could get almost the same if not exact same improvement by taking today's code and just iterating the futures in reverse order since the last future will be last to complete? Also, the blocking nature of this solution still means that you can't fan out multiple times as we can do here.
I tried to reproduce the results here, and I do see that luceneutil reports big speedups when searchConcurrency is set to 4 in baseline and 3 in candidate (this change). However I wonder if this is really a fair comparison -- are both versions really using only 4 threads? So instead I set the concurrency higher (16 in both) because my box has 16 cores. With this setting I see no real difference; see:
TaskQPS baseline StdDevQPS my_modified_version StdDev Pct diff p-value
PKLookup 174.59 (2.7%) 175.43 (2.8%) 0.5% ( -4% - 6%) 0.582
HighTerm 65.45 (4.2%) 65.92 (2.8%) 0.7% ( -6% - 8%) 0.524
so it makes me wonder if we are simply observing a measurement bias here
So instead I set the concurrency higher (16 in both) because my box has 16 cores. With this setting I see no real difference;
This is expected I'd say, I had the same result for non-IO bound work. 4 vs 3 makes a big difference because you save 25% of the forking work which is often more expensive than just doing the task. At concurrency 16 you save 1/16th and not even that. This is then more of an issue in maybe slicing the tasks better :) This change does not help with that scenario and I think the problem here would be the 16 slices being too small not how they are scheduled?
The beauty here is that if the pool is already running at full capacity the calling thread will just work through all the tasks on its own without the threads ever sleeping.
This will be the case if the task execution time is less than the time to make any worker thread active. Thats what is being optimized here for such fast running tasks. In cases where tasks are time consuming, this solution will move towards the current one. So it seems to be the best of both world.
If that is the case, that means if we have some sort of grouping of futures and invoke the coordinator only when all of those completes, we should be same or better off than this change ?
It would be better than what we have today but you'll still have the unnecessary context switches. Likely you could get almost the same if not exact same improvement by taking today's code and just iterating the futures in reverse order since the last future will be last to complete? Also, the blocking nature of this solution still means that you can't fan out multiple times as we can do here.
Agreed on the fanout part. In cases when other workers are already busy then new fanned out tasks will be executed by the same producer thread which is current behavior and when we have available workers then it will help to execute concurrently.
Sorry for the long rambling series of experiments below ... the TL/DR is: I think I can explain why the concurrent QPS looks worse than sequential (main vs main) in our testing so far. It's a false alarm / testing artifact. It's because if you run multiple (N=2 is default for luceneutil) queries at once, each of them spawning multiple work-units (M = number of slices in the index) concurrently for only P worker threads to work on ... each query will see wall clock time on average sequentialQueryTime * N / P. I.e. you will not see a P QPS speedup, but rather an P/N QPS speedup, because all of these concurrent work units are interleaved together into the executor queue.
Net/net properly testing the speedup from concurrent search in luceneutil is trappy. I hope we can improve luceneutil to make this less trappy! Maybe if searchConcurrency>1 is specified we should force only a single query in flight at once (numThreads=1) Patches welcome!
Details:
I ran just main (no concurrency) vs main (searchConcurrency=1) and got the below results. I wanted to simply test the overhead of the "concurrent" code path.
Note that I am not testing the PR here yet ... just existing Lucene 10.0 snapshot with vs without concurrency.
Note that I had to pass verifyCounts=False to the Competition since BMW changes its behavior in concurrent and non-concurrent search (right?):
RuntimeError: errors occurred: ([], ['query=body:film filter=None sort=None groupField=None hitCount=46471+: wrong hitCount: 46471+ vs 48775+', 'query=body:world filter=None sort=None groupField=None hitCount=57614+: wrong hitCount: 57614+ vs 80204+', \
'query=body:9 filter=None sort=None groupField=None hitCount=38235+: wrong hitCount: 38235+ vs 47067+', 'query=body:16 filter=None sort=None groupField=None hitCount=28485+: wrong hitCount: 28485+ vs 42534+', 'query=body:2003 filter=None sort=None grou\
pField=None hitCount=51054+: wrong hitCount: 51054+ vs 57326+', 'query=body:19 filter=None sort=None groupField=None hitCount=35412+: wrong hitCount: 35412+ vs 47975+', 'query=body:c filter=None sort=None groupField=None hitCount=50167+: wrong hitCount\
: 50167+ vs 51977+', 'query=body:people filter=None sort=None groupField=None hitCount=39428+: wrong hitCount: 39428+ vs 48528+', 'query=body:18 filter=None sort=None groupField=None hitCount=35183+: wrong hitCount: 35183+ vs 50542+', 'query=body:so fi\
lter=None sort=None groupField=None hitCount=65528+: wrong hitCount: 65528+ vs 86984+', 'query=body:city filter=None sort=None groupField=None hitCount=55383+: wrong hitCount: 55383+ vs 59901+', 'query=body:both filter=None sort=None groupField=None hi\
tCount=54659+: wrong hitCount: 54659+ vs 62908+', 'query=body:during filter=None sort=None groupField=None hitCount=53258+: wrong hitCount: 53258+ vs 65804+', 'query=body:web filter=None sort=None groupField=None hitCount=40708+: wrong hitCount: 40708+\
vs 46340+', 'query=body:right filter=None sort=None groupField=None hitCount=26596+: wrong hitCount: 26596+ vs 31803+', 'query=body:short filter=None sort=None groupField=None hitCount=42472+: wrong hitCount: 42472+ vs 57921+', 'query=body:number filt\
er=None sort=None groupField=None hitCount=31920+: wrong hitCount: 31920+ vs 32176+', 'query=body:he filter=None sort=None groupField=None hitCount=60931+: wrong hitCount: 60931+ vs 72660+', 'query=body:november filter=None sort=None groupField=None hi\
tCount=31570+: wrong hitCount: 31570+ vs 42792+', 'query=body:would filter=None sort=None groupField=None hitCount=57945+: wrong hitCount: 57945+ vs 83558+'], 1.0)
I also turned off PK, and enabled just three task patterns, but increased iterations and number of tasks per category. Here's my perf.py:
import sys
sys.path.insert(0, '/l/util/src/python')
import competition
if __name__ == '__main__':
sourceData = competition.sourceData('wikimediumall')
comp = competition.Competition(taskRepeatCount=200, taskCountPerCat=20, verifyCounts=False)
comp.addTaskPattern('HighTerm$')
comp.addTaskPattern('BrowseDayOfYearTaxoFacets')
comp.addTaskPattern('OrHighHigh')
checkout = 'trunk'
index = comp.newIndex(checkout, sourceData, numThreads=12, addDVFields=True,
grouping=False, useCMS=True,
analyzer = 'StandardAnalyzerNoStopWords',
facets = (('taxonomy:Date', 'Date'),
('taxonomy:Month', 'Month'),
('taxonomy:DayOfYear', 'DayOfYear'),
('taxonomy:RandomLabel.taxonomy', 'RandomLabel'),
('sortedset:Month', 'Month'),
('sortedset:DayOfYear', 'DayOfYear'),
('sortedset:RandomLabel.sortedset', 'RandomLabel')))
comp.competitor('seq', checkout, index=index, pk=False)
comp.competitor('conc', checkout, index=index, searchConcurrency=1, pk=False)
comp.benchmark('sequential-vs-concurrent')
And I just run this with python3 -u perf.py.
Results after four iterations (but they are not fair -- see below):
Task QPS seq StdDev QPS conc StdDev Pct diff p-value
HighTerm 717.31 (0.5%) 84.75 (0.3%) -88.2% ( -88% - -87%) 0.000
OrHighHigh 46.81 (0.7%) 7.51 (0.3%) -84.0% ( -84% - -83%) 0.000
BrowseDayOfYearTaxoFacets 18.30 (0.2%) 14.60 (0.1%) -20.2% ( -20% - -20%) 0.000
OK the big problem with this test is the sequential one is (confusingly) using six threads, while the concurrent one is using only 1 thread. This is because my luceneutil config (localconstants.py) runs 6 queries in flight at once, and in main as it is today the coordinator thread (one of these 6 threads) does NOT do any real work. I could also see via top that seq (sequential) was mostly saturating 600% (6 cores) CPU, while conc (concurrent) was mostly only saturating 100% CPU (1 core).
So then I ran again, this time using searchConcurrency=6 on the concurrent path. I could see in top that both seq and conc are seeing ~600% CPU saturation, good. Results:
Task QPS seq StdDev QPS conc StdDev Pct diff p-value
HighTerm 690.47 (0.6%) 45.04 (0.3%) -93.5% ( -93% - -93%) 0.000
OrHighHigh 44.44 (1.1%) 32.77 (0.4%) -26.3% ( -27% - -25%) 0.000
BrowseDayOfYearTaxoFacets 18.27 (0.1%) 14.60 (0.1%) -20.1% ( -20% - -19%) 0.000
Indeed, even with this more-fair comparison (6 actual threads doing real work for both seq and conc), concurrent search is wall-clock time much slower than sequential search.
I also pulled some jstacks during execution, and we can see clearly that concurrency for a single query shows up at least in two places:
- Looking up the
TermStates(terms dictionary) across all slices - Executing the Scorer iteration to retrieve all hits (across all slices)
(So, this is two chances for the context switching overhead to strike!)
Next I tested a single query in flight at once (add -numThreads=1 to conc, and -numThreads=6 to seq). This way, for the concurrent case, there is a single query in flight at once, using up to six actual worker threads. For the sequential case, we run 6 queries at a time, but each query executes sequentially. This forces the 6 worker threads to work entirely on a single query at once for conc. The results are interesting ... OrHighHigh, a fairly slow query, is indeed faster when running concurrently:
Task QPS seq StdDev QPS conc StdDev Pct diff p-value
HighTerm 673.63 (1.3%) 219.81 (1.6%) -67.4% ( -69% - -65%) 0.000
BrowseDayOfYearTaxoFacets 18.29 (0.1%) 14.84 (0.3%) -18.9% ( -19% - -18%) 0.000
OrHighHigh 44.51 (1.9%) 121.13 (11.2%) 172.1% ( 156% - 188%) 0.000
Finally, I was curious how much the cross-segment BMW skipping optimizations were altering these results, so I disabled BMW with this change:
diff --git a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
index 8c0da9539eb..d1d2dfce9b2 100644
--- a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
+++ b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
@@ -96,7 +96,8 @@ public class IndexSearcher {
* By default, we count hits accurately up to 1000. This makes sure that we don't spend most time
* on computing hit counts
*/
- private static final int TOTAL_HITS_THRESHOLD = 1000;
+ // nocommit
+ private static final int TOTAL_HITS_THRESHOLD = Integer.MAX_VALUE;
/**
* Thresholds for index slice allocation logic. To change the default, extend <code> IndexSearcher
And re-ran, report after 4 iterations (like all reports above):
Task QPS seq StdDev QPS conc StdDev Pct diff p-value
BrowseDayOfYearTaxoFacets 18.27 (0.2%) 14.87 (0.4%) -18.6% ( -19% - -18%) 0.000
OrHighHigh 15.99 (6.1%) 62.58 (4.1%) 291.3% ( 264% - 321%) 0.000
HighTerm 58.83 (4.7%) 237.07 (16.4%) 303.0% ( 269% - 340%) 0.000
And finally I think these results are somewhat explainable:
BrowseDayOfYearTaxoFacetsis always sequential (it does not even run the*:*query inluceneutil). I would expect to see no gain, but also no loss, so I'm not sure why we see ~18.5% QPS loss- The two other queries are quite costly if you force Lucene to visit all hits (by disabling BMW), and we indeed see good QPS gains for the concurrent case over the sequential case
- When testing the concurrent case, you really must allow only one query in flight at once. If you allow more than that (
luceneutildefault is 6), you are flooding the N worker threads with 6 * numSlices jobs for every query, and they will all interleave well, and a given query will only get 1/Nth the CPU working on its slices, thus drawing out the wall clock time - BMW somewhat negates concurrency gains -- not exactly sure why yet
From the learnings above (always test numThreads=1 in luceneutil to test intra-query concurrency), I tested main (numThreads=1 and searchConcurrency=7) vs branch (this PR, numThreads=1 and searchConcurrency=6). I think this is as fair a test as I can create now: both main and branch are using 7 actual worker threads since with the branch the coordinator thread also does work. Also, BMW is disabled.
TaskQPS main-conc StdDevQPS branch-conc StdDev Pct diff p-value
BrowseDayOfYearTaxoFacets 14.88 (0.4%) 14.91 (0.4%) 0.2% ( 0% - 1%) 0.409
OrHighHigh 53.80 (2.6%) 54.04 (2.0%) 0.4% ( -4% - 5%) 0.760
HighTerm 183.96 (0.3%) 185.67 (4.3%) 0.9% ( -3% - 5%) 0.626
This is after 5 JVM iterations. So, maybe some small speedups, but the confidence isn't high.
Here's my perf.py:
import sys
sys.path.insert(0, '/l/util/src/python')
import competition
if __name__ == '__main__':
sourceData = competition.sourceData('wikimediumall')
comp = competition.Competition(taskRepeatCount=200, taskCountPerCat=20, verifyCounts=False)
comp.addTaskPattern('HighTerm$')
comp.addTaskPattern('BrowseDayOfYearTaxoFacets')
comp.addTaskPattern('OrHighHigh')
checkout = 'trunk'
index = comp.newIndex(checkout, sourceData, numThreads=12, addDVFields=True,
grouping=False, useCMS=True,
analyzer = 'StandardAnalyzerNoStopWords',
facets = (('taxonomy:Date', 'Date'),
('taxonomy:Month', 'Month'),
('taxonomy:DayOfYear', 'DayOfYear'),
('taxonomy:RandomLabel.taxonomy', 'RandomLabel'),
('sortedset:Month', 'Month'),
('sortedset:DayOfYear', 'DayOfYear'),
('sortedset:RandomLabel.sortedset', 'RandomLabel')))
comp.competitor('main-conc', checkout, index=index, pk=False, searchConcurrency=7, numThreads=1)
comp.competitor('branch-conc', '/l/13472', index=index, searchConcurrency=6, pk=False, numThreads=1)
comp.benchmark('main-vs-branch-concurrent')
Next I'll test on the full set of luceneutil tasks ...