lucene icon indicating copy to clipboard operation
lucene copied to clipboard

This commit adds a new test CMS that always provides intra-merge parallelism

Open benwtrent opened this issue 1 year ago • 11 comments

@iverase has uncovered a potential issue with intra-merge CMS parallelism.

This commit helps expose this problem by forcing tests to use intra-merge parallelism instead of always (well, usually) delegating to a SameThreadExecutorService.

When intra-merge parallelism is used, norms, doc_values, stored_values, etc. are all merged in a separate thread than the thread that was used to construct their merge optimized instances.

This trips assertions numerous assertions in AssertingCodec.assertThread where we assume that the thread that called getMergeInstance() is also the thread getting the values to merge.

I am not 100% sure this is a real problem. Segment merging per field type is still only done in a single thread, we just fork the merging action to a separate thread from the constructed one.

Opening as draft for discussion.

@jpountz you may be interested in this.

benwtrent avatar Jun 10 '24 13:06 benwtrent

Here is an example of an assertion tripping:

java.lang.AssertionError: DocValuesProducer are only supposed to be consumed in the thread in which they have been acquired. But was acquired in Thread[#12828,Lucene Merge Thread #6109,5,TGRP-TestAssertingDocValuesFormat] and consumed in Thread[#12830,pool-1-thread-2,5,TGRP-TestAssertingDocValuesFormat].
	at org.apache.lucene.tests.codecs.asserting.AssertingCodec.assertThread(AssertingCodec.java:44)
	at org.apache.lucene.tests.codecs.asserting.AssertingDocValuesFormat$AssertingDocValuesProducer.getNumeric(AssertingDocValuesFormat.java:232)
	at org.apache.lucene.codecs.perfield.PerFieldDocValuesFormat$FieldsReader.getNumeric(PerFieldDocValuesFormat.java:323)
	at org.apache.lucene.codecs.DocValuesConsumer$1.getNumeric(DocValuesConsumer.java:201)
	at org.apache.lucene.tests.codecs.asserting.AssertingDocValuesFormat$AssertingDocValuesConsumer.addNumericField(AssertingDocValuesFormat.java:76)
	at org.apache.lucene.codecs.DocValuesConsumer.mergeNumericField(DocValuesConsumer.java:183)
	at org.apache.lucene.codecs.DocValuesConsumer.merge(DocValuesConsumer.java:142)
	at org.apache.lucene.codecs.perfield.PerFieldDocValuesFormat$FieldsWriter.merge(PerFieldDocValuesFormat.java:152)
	at org.apache.lucene.index.SegmentMerger.mergeDocValues(SegmentMerger.java:205)
	at org.apache.lucene.index.SegmentMerger.mergeWithLogging(SegmentMerger.java:325)
	at org.apache.lucene.index.SegmentMerger.lambda$merge$1(SegmentMerger.java:155)
	at org.apache.lucene.search.TaskExecutor$TaskGroup.lambda$createTask$0(TaskExecutor.java:117)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at org.apache.lucene.index.ConcurrentMergeScheduler$CachedExecutor.lambda$execute$0(ConcurrentMergeScheduler.java:982)

It seems to me that maybe we want to adjust the assertion? Maybe set currentThread to the first viewed thread when the values are read and assert that its always the same?

benwtrent avatar Jun 10 '24 13:06 benwtrent

After adjusting the assertion, some other assertions are now being triggered.

    java.lang.AssertionError: [body] norms must not be cached twice
        at org.apache.lucene.index.SortingCodecReader.assertCreatedOnlyOnce(SortingCodecReader.java:737)
        at org.apache.lucene.index.SortingCodecReader.getOrCreate(SortingCodecReader.java:721)
        at org.apache.lucene.index.SortingCodecReader.getOrCreateNorms(SortingCodecReader.java:714)
        at org.apache.lucene.index.SortingCodecReader$6.getNorms(SortingCodecReader.java:549)
        at org.apache.lucene.codecs.NormsConsumer$1.getNorms(NormsConsumer.java:123)
        at org.apache.lucene.codecs.lucene90.Lucene90NormsConsumer.addNormsField(Lucene90NormsConsumer.java:135)
        at org.apache.lucene.codecs.NormsConsumer.mergeNormsField(NormsConsumer.java:106)
        at org.apache.lucene.codecs.NormsConsumer.merge(NormsConsumer.java:74)
        at org.apache.lucene.index.SegmentMerger.mergeNorms(SegmentMerger.java:219)
        at org.apache.lucene.index.SegmentMerger.mergeWithLogging(SegmentMerger.java:325)
        at org.apache.lucene.index.SegmentMerger.lambda$merge$0(SegmentMerger.java:143)
        at org.apache.lucene.search.TaskExecutor$TaskGroup.lambda$createTask$0(TaskExecutor.java:117)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
        at org.apache.lucene.index.ConcurrentMergeScheduler$CachedExecutor.lambda$execute$0(ConcurrentMergeScheduler.java:982)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)

And at least one NPE:

org.apache.lucene.index.TestCodecHoldsOpenFiles > test suite's output saved to /Users/benjamintrent/Projects/lucene/lucene/core/build/test-results/test/outputs/OUTPUT-org.apache.lucene.index.TestCodecHoldsOpenFiles.txt, copied below:
   >     java.lang.IllegalStateException: this writer hit an unrecoverable error; cannot complete forceMerge
   >         at __randomizedtesting.SeedInfo.seed([AA3C2FCDA0773874:226810170E8B558C]:0)
   >         at org.apache.lucene.index.IndexWriter.forceMerge(IndexWriter.java:2161)
   >         at org.apache.lucene.index.IndexWriter.forceMerge(IndexWriter.java:2101)
   >         at org.apache.lucene.tests.index.RandomIndexWriter.doRandomForceMerge(RandomIndexWriter.java:466)
   >         at org.apache.lucene.tests.index.RandomIndexWriter.getReader(RandomIndexWriter.java:485)
   >         at org.apache.lucene.tests.index.RandomIndexWriter.getReader(RandomIndexWriter.java:427)
   >         at org.apache.lucene.index.TestCodecHoldsOpenFiles.test(TestCodecHoldsOpenFiles.java:44)
   >         at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
   >         at java.base/java.lang.reflect.Method.invoke(Method.java:580)
   >         at com.carrotsearch.randomizedtesting.RandomizedRunner.invoke(RandomizedRunner.java:1758)
   >         at com.carrotsearch.randomizedtesting.RandomizedRunner$8.evaluate(RandomizedRunner.java:946)
   >         at com.carrotsearch.randomizedtesting.RandomizedRunner$9.evaluate(RandomizedRunner.java:982)
   >         at com.carrotsearch.randomizedtesting.RandomizedRunner$10.evaluate(RandomizedRunner.java:996)
   >         at org.apache.lucene.tests.util.TestRuleSetupTeardownChained$1.evaluate(TestRuleSetupTeardownChained.java:48)
   >         at org.apache.lucene.tests.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:43)
   >         at org.apache.lucene.tests.util.TestRuleThreadAndTestName$1.evaluate(TestRuleThreadAndTestName.java:45)
   >         at org.apache.lucene.tests.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:60)
   >         at org.apache.lucene.tests.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:44)
   >         at org.junit.rules.RunRules.evaluate(RunRules.java:20)
   >         at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
   >         at com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:390)
   >         at com.carrotsearch.randomizedtesting.ThreadLeakControl.forkTimeoutingTask(ThreadLeakControl.java:843)
   >         at com.carrotsearch.randomizedtesting.ThreadLeakControl$3.evaluate(ThreadLeakControl.java:490)
   >         at com.carrotsearch.randomizedtesting.RandomizedRunner.runSingleTest(RandomizedRunner.java:955)
   >         at com.carrotsearch.randomizedtesting.RandomizedRunner$5.evaluate(RandomizedRunner.java:840)
   >         at com.carrotsearch.randomizedtesting.RandomizedRunner$6.evaluate(RandomizedRunner.java:891)
   >         at com.carrotsearch.randomizedtesting.RandomizedRunner$7.evaluate(RandomizedRunner.java:902)
   >         at org.apache.lucene.tests.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:43)
   >         at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
   >         at org.apache.lucene.tests.util.TestRuleStoreClassName$1.evaluate(TestRuleStoreClassName.java:38)
   >         at com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
   >         at com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
   >         at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
   >         at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
   >         at org.apache.lucene.tests.util.TestRuleAssertionsRequired$1.evaluate(TestRuleAssertionsRequired.java:53)
   >         at org.apache.lucene.tests.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:43)
   >         at org.apache.lucene.tests.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:44)
   >         at org.apache.lucene.tests.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:60)
   >         at org.apache.lucene.tests.util.TestRuleIgnoreTestSuites$1.evaluate(TestRuleIgnoreTestSuites.java:47)
   >         at org.junit.rules.RunRules.evaluate(RunRules.java:20)
   >         at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
   >         at com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:390)
   >         at com.carrotsearch.randomizedtesting.ThreadLeakControl.lambda$forkTimeoutingTask$0(ThreadLeakControl.java:850)
   >         at java.base/java.lang.Thread.run(Thread.java:1583)
   >
   >         Caused by:
   >         java.lang.NullPointerException: Cannot invoke "org.apache.lucene.search.Sort.getSort()" because the return value of "org.apache.lucene.index.LeafMetaData.getSort()" is null
   >             at org.apache.lucene.index.SortingCodecReader.assertCreatedOnlyOnce(SortingCodecReader.java:739)
   >             at org.apache.lucene.index.SortingCodecReader.getOrCreate(SortingCodecReader.java:721)
   >             at org.apache.lucene.index.SortingCodecReader.getOrCreateDV(SortingCodecReader.java:759)
   >             at org.apache.lucene.index.SortingCodecReader$7.getNumeric(SortingCodecReader.java:571)
   >             at org.apache.lucene.codecs.DocValuesConsumer$1.getNumeric(DocValuesConsumer.java:201)
   >             at org.apache.lucene.codecs.lucene90.Lucene90DocValuesConsumer$1.getSortedNumeric(Lucene90DocValuesConsumer.java:138)
   >             at org.apache.lucene.codecs.lucene90.Lucene90DocValuesConsumer.writeValues(Lucene90DocValuesConsumer.java:188)
   >             at org.apache.lucene.codecs.lucene90.Lucene90DocValuesConsumer.addNumericField(Lucene90DocValuesConsumer.java:133)
   >             at org.apache.lucene.tests.codecs.asserting.AssertingDocValuesFormat$AssertingDocValuesConsumer.addNumericField(AssertingDocValuesFormat.java:87)
   >             at org.apache.lucene.codecs.DocValuesConsumer.mergeNumericField(DocValuesConsumer.java:183)
   >             at org.apache.lucene.codecs.DocValuesConsumer.merge(DocValuesConsumer.java:142)
   >             at org.apache.lucene.codecs.perfield.PerFieldDocValuesFormat$FieldsWriter.merge(PerFieldDocValuesFormat.java:151)
   >             at org.apache.lucene.index.SegmentMerger.mergeDocValues(SegmentMerger.java:205)
   >             at org.apache.lucene.index.SegmentMerger.mergeWithLogging(SegmentMerger.java:325)
   >             at org.apache.lucene.index.SegmentMerger.lambda$merge$1(SegmentMerger.java:155)
   >             at org.apache.lucene.search.TaskExecutor$TaskGroup.lambda$createTask$0(TaskExecutor.java:117)
   >             at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
   >             at org.apache.lucene.index.ConcurrentMergeScheduler$CachedExecutor.lambda$execute$0(ConcurrentMergeScheduler.java:982)
   >             at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
   >             at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
   >             ... 1 more

benwtrent avatar Jun 10 '24 13:06 benwtrent

Making norms & terms merge synchronously (not forking to the intra-merge pool), makes the assertions go away, but then (surprise surprise), another test fails 🤦

EDIT: I think this is because the test assumes terms and doc values are merged in parallel. However, I adjusted the code to NOT do that.

gradlew :lucene:core:test --tests "org.apache.lucene.index.TestIndexWriterForceMerge.testMergePerField" -Ptests.jvms=5 "-Ptests.jvmargs=-XX:TieredStopAtLevel=1 -XX:+UseParallelGC -XX:ActiveProcessorCount=1" -Ptests.seed=FF93459708A1FFD4 -Ptests.gui=false -Ptests.file.encoding=US-ASCII -Ptests.vectorsize=128 -Ptests.forceintegervectors=true
TestIndexWriterForceMerge > testMergePerField FAILED
    java.lang.AssertionError: broken barrier
        at org.apache.lucene.index.TestIndexWriterForceMerge$BlockingOnMergePostingsFormat$1.merge(TestIndexWriterForceMerge.java:401)
        at org.apache.lucene.codecs.perfield.PerFieldPostingsFormat$FieldsWriter.merge(PerFieldPostingsFormat.java:204)
        at org.apache.lucene.index.SegmentMerger.mergeTerms(SegmentMerger.java:230)
        at org.apache.lucene.index.SegmentMerger.mergeWithLogging(SegmentMerger.java:319)
        at org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:144)
        at org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:5284)
        at org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4752)
        at org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:6573)
        at org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:668)
        at org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:729)

        Caused by:
        java.util.concurrent.TimeoutException
            at java.base/java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:259)
            at java.base/java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:437)
            at org.apache.lucene.index.TestIndexWriterForceMerge$BlockingOnMergePostingsFormat$1.merge(TestIndexWriterForceMerge.java:399)
            ... 9 more

    java.lang.AssertionError: broken barrier
        at org.apache.lucene.index.TestIndexWriterForceMerge$BlockingOnMergePostingsFormat$1.merge(TestIndexWriterForceMerge.java:401)
        at org.apache.lucene.codecs.perfield.PerFieldPostingsFormat$FieldsWriter.merge(PerFieldPostingsFormat.java:204)
        at org.apache.lucene.index.SegmentMerger.mergeTerms(SegmentMerger.java:230)
        at org.apache.lucene.index.SegmentMerger.mergeWithLogging(SegmentMerger.java:319)
        at org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:144)
        at org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:5284)
        at org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4752)
        at org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:6573)
        at org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:668)
        at org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:729)

        Caused by:
        java.util.concurrent.TimeoutException
            at java.base/java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:259)
            at java.base/java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:437)
            at org.apache.lucene.index.TestIndexWriterForceMerge$BlockingOnMergePostingsFormat$1.merge(TestIndexWriterForceMerge.java:399)
            ... 9 more

benwtrent avatar Jun 10 '24 13:06 benwtrent

I ran some more tests. My logic for "move assertion to when we first read the index segment values" still doesn't work.

Its possible with index sorting that we read doc values once from the merge thread, and then later kick off doc value merging in a separate thread. While this isn't executed at the same time, it can trip assertions.

I am not sure how to avoid this or better protect this code.

benwtrent avatar Jun 10 '24 20:06 benwtrent

Thanks for looking into this. I agree that it may not be an issue in practice, but I still like that we enforce thread confinment for merging. It seems that the root cause is that we pull readers (DocValuesProducer, etc.) when constructing the MergeState before potentially forking to a different thread. Would it address the problem if we moved pulling these readers to the thread that actually does the merging?

jpountz avatar Jun 17 '24 10:06 jpountz

Would it address the problem if we moved pulling these readers to the thread that actually does the merging?

@jpountz I am not sure, the idea seems OK to me, but I have two concerns.

If we are sorted by a doc_values field, we need to read the values to build the sort values. So, I am not sure how we can ever do that in the same thread the doc_values are merged by because all other threads need to know the sorting.

When merging norms & doc values, they cause weird thrashing in SortingCodecReader#getOrCreate explained further in: https://github.com/apache/lucene/issues/13478#issuecomment-2160815181.

benwtrent avatar Jun 17 '24 11:06 benwtrent

It does not address the SortingCodecReader problem, but here's a commit that does what I had in mind FWIW: https://github.com/apache/lucene/compare/main...jpountz:lucene:test/improve-cms-concurrency-testing?expand=1.

jpountz avatar Jun 17 '24 14:06 jpountz

@jpountz I pushed your suggestion. This way we respect the threaded uniqueness of the readers.

What do you think of having this in 9.11.1? Seems like we should have this bug fix there & all the testing that brought this to light.

Then in 9.12 we can worry about the SortingCodecReader, etc.

benwtrent avatar Jun 17 '24 16:06 benwtrent

It feels too big for 9.11, maybe it's better to only keep knn vector merging parallelism in 9.11 and have this change only in 9.12?

jpountz avatar Jun 18 '24 06:06 jpountz

@jpountz sounds good

benwtrent avatar Jun 18 '24 10:06 benwtrent

This PR has not had activity in the past 2 weeks, labeling it as stale. If the PR is waiting for review, notify the [email protected] list. Thank you for your contribution!

github-actions[bot] avatar Jul 03 '24 00:07 github-actions[bot]

@jpountz I would really like to get this in before we start using intra-merge parallelism any further.

We reverted all parallelism in 9.11. This adjusts the parallelism as I am pretty sure main & 9x are still broken in how they handle intra merge concurrency (not handling index sorting, etc.)

I think we should merge this and backport to 9.12. If we don't want to back port to 9.12, we should revert the intra merge parallelism there like was done in 9.11.

benwtrent avatar Sep 11 '24 11:09 benwtrent