EventStoreDB-Client-Java
EventStoreDB-Client-Java copied to clipboard
UndeliverableException for StreamNotFoundException
We rarely observe the following exception on our servers which crashes the application:
From the stacktrace it looks like the StreamNotFoundException (or at least onError) is emitted twice which is not legal for a Publisher. The first time may be here: https://github.com/EventStore/EventStoreDB-Client-Java/blob/trunk/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRead.java#L108 and the second time here: https://github.com/EventStore/EventStoreDB-Client-Java/blob/trunk/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRead.java#L113 (this line is in the stacktrace).
Although the ReadSubscription tries to protect against this with an AtomicBoolean so I don't know how this exception can still happen. There was some exception handling changes here, not sure if they are related: https://github.com/EventStore/EventStoreDB-Client-Java/commit/3fb47014ce22d4cfae2ad209ccde08d393f64126
Forcing JVM exit because Thread[xxxx-xxxx-xxxxx-16,5,main] threw an uncaught exception
io.reactivex.rxjava3.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | com.eventstore.dbclient.StreamNotFoundException
at io.reactivex.rxjava3.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:372)
at io.reactivex.rxjava3.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:96)
at io.reactivex.rxjava3.internal.operators.observable.ObservableFromPublisher$PublisherSubscriber.onError(ObservableFromPublisher.java:52)
at com.eventstore.dbclient.ReadSubscription.onError(ReadSubscription.java:39)
at com.eventstore.dbclient.AbstractRead.lambda$subscribe$1(AbstractRead.java:113)
at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990)
at java.base/java.util.concurrent.CompletableFuture.uniExceptionallyStage(CompletableFuture.java:1008)
at java.base/java.util.concurrent.CompletableFuture.exceptionally(CompletableFuture.java:2364)
at com.eventstore.dbclient.AbstractRead.subscribe(AbstractRead.java:112)
at io.reactivex.rxjava3.internal.operators.observable.ObservableFromPublisher.subscribeActual(ObservableFromPublisher.java:32)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
at io.reactivex.rxjava3.internal.operators.observable.ObservableFilter.subscribeActual(ObservableFilter.java:30)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
at io.reactivex.rxjava3.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
at io.reactivex.rxjava3.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
at io.reactivex.rxjava3.internal.jdk8.ObservableMapOptional.subscribeActual(ObservableMapOptional.java:42)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
at io.reactivex.rxjava3.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
at io.reactivex.rxjava3.internal.operators.observable.ObservableOnErrorComplete.subscribeActual(ObservableOnErrorComplete.java:41)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
at io.reactivex.rxjava3.internal.operators.observable.ObservableElementAtMaybe.subscribeActual(ObservableElementAtMaybe.java:32)
at io.reactivex.rxjava3.core.Maybe.subscribe(Maybe.java:5377)
at io.reactivex.rxjava3.internal.operators.maybe.MaybeMap.subscribeActual(MaybeMap.java:41)
at io.reactivex.rxjava3.core.Maybe.subscribe(Maybe.java:5377)
at io.reactivex.rxjava3.internal.operators.maybe.MaybeToSingle.subscribeActual(MaybeToSingle.java:46)
at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
at io.reactivex.rxjava3.internal.operators.single.SingleFlatMap.subscribeActual(SingleFlatMap.java:37)
at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
at io.reactivex.rxjava3.internal.operators.single.SingleFlatMapMaybe.subscribeActual(SingleFlatMapMaybe.java:38)
at io.reactivex.rxjava3.core.Maybe.subscribe(Maybe.java:5377)
at io.reactivex.rxjava3.internal.operators.maybe.MaybeToSingle.subscribeActual(MaybeToSingle.java:46)
at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
at io.reactivex.rxjava3.internal.operators.single.SingleResumeNext.subscribeActual(SingleResumeNext.java:39)
at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
at io.reactivex.rxjava3.core.Single.blockingGet(Single.java:3644)
at com.xxx.xxx.XXXXXX.lambda$xxxxxxxxxx$17(XXXXXXXXX.java:260)
at io.reactivex.rxjava3.internal.operators.single.SingleFromCallable.subscribeActual(SingleFromCallable.java:43)
at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
at io.reactivex.rxjava3.internal.operators.single.SingleSubscribeOn$SubscribeOnObserver.run(SingleSubscribeOn.java:89)
at io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
at io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:25)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: com.eventstore.dbclient.StreamNotFoundException: null
at com.eventstore.dbclient.AbstractRead$1.onNext(AbstractRead.java:62)
at com.eventstore.dbclient.AbstractRead$1.onNext(AbstractRead.java:48)
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:478)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:660)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:647)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
... 3 common frames omitted
Client: 4.3.0
Hey @dosomder,
Can you share some info on your runtime workload when it happens? Is there a situation where you delete a stream while another part of your code is reading from it?
We don't delete a stream in particular but we set maxAge and maxCount.
In this case when the exception happens, we try to read the last event from a stream and expect it to be empty. This is how we read the stream. We handle the StreamNotFoundException in the rxjava chain.
Observable.fromPublisher(
this.client.readStreamReactive(
this.getStreamName(),
ReadStreamOptions.get().fromEnd().backwards().maxCount(1)))
// Only interested in the actual event
.filter(ReadMessage::hasEvent)
.map(ReadMessage::getEvent)
// Decode event
.mapOptional(this.translator::decodeEvent)
// A none existing stream is to be treated as if no events exist
.onErrorComplete(StreamNotFoundException.class::isInstance);
@dosomder Sorry for taking time to respond. I think you have diagnosed the issue properly. I need to conduct some tests to see when an exception has been raised in a future, was it already processed by the observer of the subscription or if it's the other way around.
@YoEight Meanwhile we are on client 5.x and since then (6 weeks now) we have not seen this exception on our environments.
Unfortunately, I can assure you that the issue is still there, even if it's scarce. I never managed to reproduce it locally but per RX documentation, the way we just the library in this context is wrong.