EventStoreDB-Client-Java icon indicating copy to clipboard operation
EventStoreDB-Client-Java copied to clipboard

UndeliverableException for StreamNotFoundException

Open dosomder opened this issue 2 years ago • 5 comments
trafficstars

We rarely observe the following exception on our servers which crashes the application:

From the stacktrace it looks like the StreamNotFoundException (or at least onError) is emitted twice which is not legal for a Publisher. The first time may be here: https://github.com/EventStore/EventStoreDB-Client-Java/blob/trunk/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRead.java#L108 and the second time here: https://github.com/EventStore/EventStoreDB-Client-Java/blob/trunk/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRead.java#L113 (this line is in the stacktrace).

Although the ReadSubscription tries to protect against this with an AtomicBoolean so I don't know how this exception can still happen. There was some exception handling changes here, not sure if they are related: https://github.com/EventStore/EventStoreDB-Client-Java/commit/3fb47014ce22d4cfae2ad209ccde08d393f64126

Forcing JVM exit because Thread[xxxx-xxxx-xxxxx-16,5,main] threw an uncaught exception
io.reactivex.rxjava3.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | com.eventstore.dbclient.StreamNotFoundException
	at io.reactivex.rxjava3.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:372)
	at io.reactivex.rxjava3.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:96)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableFromPublisher$PublisherSubscriber.onError(ObservableFromPublisher.java:52)
	at com.eventstore.dbclient.ReadSubscription.onError(ReadSubscription.java:39)
	at com.eventstore.dbclient.AbstractRead.lambda$subscribe$1(AbstractRead.java:113)
	at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990)
	at java.base/java.util.concurrent.CompletableFuture.uniExceptionallyStage(CompletableFuture.java:1008)
	at java.base/java.util.concurrent.CompletableFuture.exceptionally(CompletableFuture.java:2364)
	at com.eventstore.dbclient.AbstractRead.subscribe(AbstractRead.java:112)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableFromPublisher.subscribeActual(ObservableFromPublisher.java:32)
	at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableFilter.subscribeActual(ObservableFilter.java:30)
	at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
	at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
	at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
	at io.reactivex.rxjava3.internal.jdk8.ObservableMapOptional.subscribeActual(ObservableMapOptional.java:42)
	at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
	at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableOnErrorComplete.subscribeActual(ObservableOnErrorComplete.java:41)
	at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
	at io.reactivex.rxjava3.internal.operators.observable.ObservableElementAtMaybe.subscribeActual(ObservableElementAtMaybe.java:32)
	at io.reactivex.rxjava3.core.Maybe.subscribe(Maybe.java:5377)
	at io.reactivex.rxjava3.internal.operators.maybe.MaybeMap.subscribeActual(MaybeMap.java:41)
	at io.reactivex.rxjava3.core.Maybe.subscribe(Maybe.java:5377)
	at io.reactivex.rxjava3.internal.operators.maybe.MaybeToSingle.subscribeActual(MaybeToSingle.java:46)
	at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
	at io.reactivex.rxjava3.internal.operators.single.SingleFlatMap.subscribeActual(SingleFlatMap.java:37)
	at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
	at io.reactivex.rxjava3.internal.operators.single.SingleFlatMapMaybe.subscribeActual(SingleFlatMapMaybe.java:38)
	at io.reactivex.rxjava3.core.Maybe.subscribe(Maybe.java:5377)
	at io.reactivex.rxjava3.internal.operators.maybe.MaybeToSingle.subscribeActual(MaybeToSingle.java:46)
	at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
	at io.reactivex.rxjava3.internal.operators.single.SingleResumeNext.subscribeActual(SingleResumeNext.java:39)
	at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
	at io.reactivex.rxjava3.core.Single.blockingGet(Single.java:3644)
	at com.xxx.xxx.XXXXXX.lambda$xxxxxxxxxx$17(XXXXXXXXX.java:260)
	at io.reactivex.rxjava3.internal.operators.single.SingleFromCallable.subscribeActual(SingleFromCallable.java:43)
	at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
	at io.reactivex.rxjava3.internal.operators.single.SingleSubscribeOn$SubscribeOnObserver.run(SingleSubscribeOn.java:89)
	at io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
	at io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:25)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: com.eventstore.dbclient.StreamNotFoundException: null
	at com.eventstore.dbclient.AbstractRead$1.onNext(AbstractRead.java:62)
	at com.eventstore.dbclient.AbstractRead$1.onNext(AbstractRead.java:48)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:478)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:660)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:647)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	... 3 common frames omitted

Client: 4.3.0

dosomder avatar Oct 25 '23 13:10 dosomder

Hey @dosomder,

Can you share some info on your runtime workload when it happens? Is there a situation where you delete a stream while another part of your code is reading from it?

YoEight avatar Oct 25 '23 14:10 YoEight

We don't delete a stream in particular but we set maxAge and maxCount.

In this case when the exception happens, we try to read the last event from a stream and expect it to be empty. This is how we read the stream. We handle the StreamNotFoundException in the rxjava chain.

Observable.fromPublisher(
        this.client.readStreamReactive(
            this.getStreamName(),
            ReadStreamOptions.get().fromEnd().backwards().maxCount(1)))
        // Only interested in the actual event
        .filter(ReadMessage::hasEvent)
        .map(ReadMessage::getEvent)
        // Decode event
        .mapOptional(this.translator::decodeEvent)
        // A none existing stream is to be treated as if no events exist
        .onErrorComplete(StreamNotFoundException.class::isInstance);

dosomder avatar Oct 25 '23 14:10 dosomder

@dosomder Sorry for taking time to respond. I think you have diagnosed the issue properly. I need to conduct some tests to see when an exception has been raised in a future, was it already processed by the observer of the subscription or if it's the other way around.

YoEight avatar Nov 09 '23 18:11 YoEight

@YoEight Meanwhile we are on client 5.x and since then (6 weeks now) we have not seen this exception on our environments.

dosomder avatar Dec 13 '23 13:12 dosomder

Unfortunately, I can assure you that the issue is still there, even if it's scarce. I never managed to reproduce it locally but per RX documentation, the way we just the library in this context is wrong.

YoEight avatar Dec 13 '23 14:12 YoEight