fdb-record-layer icon indicating copy to clipboard operation
fdb-record-layer copied to clipboard

testOrQuery5WithLimits intermittently fails with a NoSuchElementException

Open alecgrieser opened this issue 5 years ago • 3 comments

The testOrQuery5WithLimits test has intermittently failed with NoSuchElementException trying to pop from the queue:

10:36:59 com.apple.foundationdb.record.provider.foundationdb.query.FDBOrQueryToUnionTest > testOrQuery5WithLimits(int)[1] FAILED
10:36:59     java.util.NoSuchElementException
10:36:59         at java.util.ArrayDeque.removeFirst(ArrayDeque.java:285)
10:36:59         at java.util.ArrayDeque.remove(ArrayDeque.java:452)
10:36:59         at com.apple.foundationdb.record.cursors.MapPipelinedCursor.close(MapPipelinedCursor.java:148)
10:36:59         at com.apple.foundationdb.record.cursors.MapCursor.close(MapCursor.java:113)
10:36:59         at com.apple.foundationdb.record.provider.foundationdb.cursors.MergeCursorState.close(MergeCursorState.java:97)
10:36:59         at java.util.ArrayList.forEach(ArrayList.java:1257)
10:36:59         at com.apple.foundationdb.record.provider.foundationdb.cursors.MergeCursor.close(MergeCursor.java:349)
10:36:59         at com.apple.foundationdb.record.provider.foundationdb.cursors.UnorderedUnionCursor.close(UnorderedUnionCursor.java:58)
10:36:59         at com.apple.foundationdb.record.cursors.FilterCursor.close(FilterCursor.java:122)
10:36:59         at com.apple.foundationdb.record.cursors.RowLimitedCursor.close(RowLimitedCursor.java:126)
10:36:59         at com.apple.foundationdb.record.RecordCursorIterator.close(RecordCursorIterator.java:149)
10:36:59         at com.apple.foundationdb.record.provider.foundationdb.query.FDBOrQueryToUnionTest.testOrQuery5WithLimits(FDBOrQueryToUnionTest.java:416)

It is referencing this part of the code here:

https://github.com/FoundationDB/fdb-record-layer/blob/cb1fe996d703ab0c661844c6957f5e55e5e0a7ca/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/cursors/MapPipelinedCursor.java#L141-L155

In particular, it's the remove call that is causing problems. As can be seen, the remove call is guarded with an isEmpty check, which would seem to suggest that the problem has something to do with concurrent accesses to this data structure (which is probably at least partially why the test is intermittent). But the underlying data structure, an ArrayDeque is not thread safe: https://docs.oracle.com/javase/8/docs/api/java/util/ArrayDeque.html So, presumably, we need some kind of synchronization mechanism here?

alecgrieser avatar Apr 18 '19 18:04 alecgrieser

This I really strange. Why would we be trying to close a cursor from multiple places concurrently?

nschiefer avatar Apr 18 '19 18:04 nschiefer

It could also be that, like, some pipelined task is accessing the queue at the same time as the close method.

alecgrieser avatar Apr 18 '19 18:04 alecgrieser

I did some experimenting on 7401456d, which should also be the 2.7.75.0 tag.

I believe what's happening is that some of the cursors in the cursor pipeline are starting asynchronous work, and then that sometimes completes at the same time as closing the cursor. Here's a sample test:

@Test
public void mapPipelineCloseWhileCancelling() {
    Map<Class<? extends Throwable>, Integer> exceptionCount = new HashMap<>();
    for (int i = 0; i < 1000; i++) {
        try {
            LOGGER.info(KeyValueLogMessage.of("running map pipeline close test", "iteration", i));
            CompletableFuture<Void> signal = new CompletableFuture<>();
            RecordCursor<Integer> cursor = RecordCursor.fromList(Arrays.asList(1066, 1415, 800, 1861))
                    .mapPipelined(val -> signal.thenApplyAsync(ignore -> val + 349), 10);
            CompletableFuture<RecordCursorResult<Integer>> resultFuture = cursor.onNext();

            signal.complete(null);
            cursor.close();
            resultFuture.join();
        } catch (Exception e) {
            Throwable errToCount;
            if (e instanceof CompletionException && e.getCause() != null) {
                errToCount = e.getCause();
                errToCount.addSuppressed(e);
            } else {
                errToCount = e;
            }
            LOGGER.error(KeyValueLogMessage.of("error during test"), errToCount);
            exceptionCount.compute(errToCount.getClass(), (k, v) -> v == null ? 1 : v + 1);
        }
    }
    KeyValueLogMessage msg = KeyValueLogMessage.build("exception counts");
    msg.addKeysAndValues(exceptionCount);
    LOGGER.info(msg.toString());
}

The important thing here is that the futures are all pipelined up and then wait. Then the signal is fired, and they all start completing, but concurrent with that, the cursor is closed. Closing the cursor then loops through the outstanding futures and cancels them, but the cursor completing and returning a result also attempts to take the first element from the top of the cursor.

Running this produced the following distribution of errors (though it varies from run to run):

2019-07-20 10:16:30,085 [INFO] c.a.f.r.RecordCursorTest - exception counts class java.lang.NullPointerException="93" class java.util.NoSuchElementException="31" class java.util.concurrent.CancellationException="59"

The stack trace here is a little bit different from the one we have earlier in the issue, even for the NoSuchElementExceptions. Those mostly seem to happen in onNext when the code there assumes that the completed and filled pipeline signifies that there should be an element ready to go, whereas the error we're seeing above is in close when it tries to extract something from the pipeline. I think those are two sides of the same coin, though, so that might not be so bad. Here's a stack for that:

java.util.NoSuchElementException
	at java.util.ArrayDeque.removeFirst(ArrayDeque.java:285)
	at java.util.ArrayDeque.remove(ArrayDeque.java:452)
	at com.apple.foundationdb.record.cursors.MapPipelinedCursor.lambda$onNext$1(MapPipelinedCursor.java:94)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
	at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:902)
	at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:956)
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
	at com.apple.foundationdb.async.AsyncUtil$LoopPartial.apply(AsyncUtil.java:350)
	at com.apple.foundationdb.async.AsyncUtil$LoopPartial.apply(AsyncUtil.java:332)
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
	at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
	Suppressed: java.util.concurrent.CompletionException: java.util.NoSuchElementException
		at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
		at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
		at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:604)
		... 17 more
	[CIRCULAR REFERENCE:java.util.NoSuchElementException]

The NPEs and the cancellation exceptions are somewhat more mysterious. I expected an NPE in one of the places where we go if (x != null) do_thing_with(x);, which then would play poorly with something using x. What I think is actually happening is somewhat more subtle--in particular, the code within the pipeline loop is filling up the pipeline, then signaling to the while true loop that it's done, then it is completing and trying to grab the first item from the pipeline. However, in the meantime, the pipeline is cleared out by close(), so when it calls pipeline.peek(), it gets back a null future, and then it tries to join on that, and that causes the NPE. Here's a stack for that:

java.lang.NullPointerException
	at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
	at com.apple.foundationdb.async.AsyncUtil$LoopPartial.apply(AsyncUtil.java:350)
	at com.apple.foundationdb.async.AsyncUtil$LoopPartial.apply(AsyncUtil.java:332)
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
	at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
	Suppressed: java.util.concurrent.CompletionException: java.lang.NullPointerException
		at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
		at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
		at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:961)
		... 12 more
	[CIRCULAR REFERENCE:java.lang.NullPointerException]

The final error is the cancellation error. The interesting thing here is that it is actually the caller of cancel that is getting the error rather than something waiting on a canceled future. In particular, when the close() logic goes through and cancels all of the things, it will occasionally cancel a future that, I think, begins executing callbacks that then results in it joining on a canceled future, so (boom) the canceler is themselves canceled. Here's a stack for that:

java.util.concurrent.CancellationException
	at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2263)
	at com.apple.foundationdb.record.cursors.MapPipelinedCursor.close(MapPipelinedCursor.java:148)
	at com.apple.foundationdb.record.RecordCursorTest.mapPipelineCloseWhileCancelling(RecordCursorTest.java:822)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:436)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:170)
	at org.junit.jupiter.engine.execution.ThrowableCollector.execute(ThrowableCollector.java:40)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:166)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:113)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:58)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$3(HierarchicalTestExecutor.java:112)
	at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.executeRecursively(HierarchicalTestExecutor.java:108)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.execute(HierarchicalTestExecutor.java:79)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$2(HierarchicalTestExecutor.java:120)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.Iterator.forEachRemaining(Iterator.java:116)
	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$3(HierarchicalTestExecutor.java:120)
	at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.executeRecursively(HierarchicalTestExecutor.java:108)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.execute(HierarchicalTestExecutor.java:79)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$2(HierarchicalTestExecutor.java:120)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.Iterator.forEachRemaining(Iterator.java:116)
	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$3(HierarchicalTestExecutor.java:120)
	at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.executeRecursively(HierarchicalTestExecutor.java:108)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.execute(HierarchicalTestExecutor.java:79)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:55)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:43)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)
	at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)
	at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:102)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:82)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:78)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
	at com.sun.proxy.$Proxy2.stop(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:132)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
	at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
	at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
	at java.lang.Thread.run(Thread.java:748)
	Suppressed: java.util.concurrent.CompletionException: java.util.concurrent.CancellationException
		at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
		at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
		at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
		at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
		at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
		at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2265)
		... 83 more
	[CIRCULAR REFERENCE:java.util.concurrent.CancellationException]

It might be worth me seeing if I can get less truncated stacks there to see if there are things I'm missing here.

There's one more, somewhat more serious problem here, which is that this can sometimes result in different values being returned from the cursor. In particular, if the futures complete in just the right way, the first element in the pipeline can be removed, and then onNext gets called and (oops) the second overall element is now at the top of the queue and is returned to the user. So the first item is just out for the count entirely.

I think .close() probably shouldn't clear out elements from the pipeline, though it might make sense for it to cancel outstanding work (assuming we can fix the thing that causes itself to have the cancellation exception thrown back at its face).

alecgrieser avatar Jul 20 '19 17:07 alecgrieser