StompProtocolAndroid icon indicating copy to clipboard operation
StompProtocolAndroid copied to clipboard

OnErrorNotImplementedException in StompClient.java

Open joshuajwitter opened this issue 5 years ago • 19 comments

First off, thank you to everyone who works on this awesome project it really works well and is extremely useful.

I may have found a bug. Hopefully this has not been posted about and answered already, I tried to read all of the existing Issues and didn't see this exact issue.

First, some background: I have seen the OnErrorNotImplementedException like other users:

io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.util.NoSuchElementException

...and after reading through the other issues posted and Stack Overflow I have fixed it by adding an error handler to my code that calls subscribe(), like so:

        // begin topic subscription for user data
        stompClient.topic(TOPIC_NAME_HERE)
            .doOnError(throwable -> {
                // log the error and tell the service to resubscribe
            })
            .subscribeOn(Schedulers.io())
            .subscribe(new Subscriber<StompMessage>() {
                @Override
                public void onSubscribe(Subscription subscription) {
                }

                @Override
                public void onNext(StompMessage topicMessage) {
                    // handle the incoming message
                }

                @Override
                public void onError(Throwable error) {
                    Log.e(TAG, "Error on user data topic", error);
                    resubscribe = true;
                }

                @Override
                public void onComplete() {
                }
            });

This made the crashes extremely rare. With the help of the debugger I found that crashes would still happen occasionally when using the StompClient to use heartbeat messages inside the StompClient's connect method. In that method the subscribe call is made but no error handler is specified. When I initially connect the StompClient and setup heartbeating (via stompClient.withClientHeartbeat(1000).withServerHeartbeat(1000);) I see the error occur after I call stompClient.disconnect(). I think this is because there is an error that is not handled.

I might have totally missed something.. If this is a legit issue would be happy to add the error handlers to the StompClient connect() method code and create a PR if it helps.

joshuajwitter avatar Aug 02 '19 18:08 joshuajwitter

I have seen this issue recently too. Have you fixed it on your end, if so, how?

codedentwickler avatar Sep 11 '19 09:09 codedentwickler

@codedentwickler I am still seeing a large number of crashes even after trying to fix this error myself. This is what the stack traces for the exceptions look like:

java.util.Scanner.skip Scanner.java:1749
java.util.Scanner.skip Scanner.java:1766
ua.naiksoftware.stomp.dto.StompMessage.from SourceFile:89
ua.naiksoftware.stomp.-$$Lambda$S9OD0HNfof0SoPkv-GiHdisPzBw.apply
io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext SourceFile:57
io.reactivex.internal.operators.observable.ObservableConcatMap$ConcatMapDelayErrorObserver$DelayErrorInnerObserver.onNext SourceFile:506
io.reactivex.subjects.PublishSubject$PublishDisposable.onNext SourceFile:308
io.reactivex.subjects.PublishSubject.onNext SourceFile:228
ua.naiksoftware.stomp.provider.AbstractConnectionProvider.emitMessage SourceFile:117
ua.naiksoftware.stomp.provider.OkHttpConnectionProvider$1.[onOpen | onMessage | onMessage | onFailure | onClosing] SourceFile:67
okhttp3.internal.ws.RealWebSocket.onReadMessage SourceFile:323
okhttp3.internal.ws.WebSocketReader.readMessageFrame SourceFile:219
okhttp3.internal.ws.WebSocketReader.processNextFrame SourceFile:105
okhttp3.internal.ws.RealWebSocket.loopReader SourceFile:274
okhttp3.internal.ws.RealWebSocket$2.onResponse SourceFile:214
okhttp3.RealCall$AsyncCall.execute SourceFile:206
okhttp3.internal.NamedRunnable.run SourceFile:32
java.util.concurrent.ThreadPoolExecutor.runWorker ThreadPoolExecutor.java:1167
java.util.concurrent.ThreadPoolExecutor$Worker.run ThreadPoolExecutor.java:641
java.lang.Thread.run Thread.java:764
io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept SourceFile:704
io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept SourceFile:701
io.reactivex.internal.observers.LambdaObserver.onError SourceFile:77
io.reactivex.internal.observers.BasicFuseableObserver.onError SourceFile:100
io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError SourceFile:117
io.reactivex.internal.observers.BasicFuseableObserver.onError SourceFile:100
io.reactivex.internal.observers.BasicFuseableObserver.onError SourceFile:100
io.reactivex.internal.observers.BasicFuseableObserver.fail SourceFile:110
io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext SourceFile:59
io.reactivex.internal.operators.observable.ObservableConcatMap$ConcatMapDelayErrorObserver$DelayErrorInnerObserver.onNext SourceFile:506
io.reactivex.subjects.PublishSubject$PublishDisposable.onNext SourceFile:308
io.reactivex.subjects.PublishSubject.onNext SourceFile:228
ua.naiksoftware.stomp.provider.AbstractConnectionProvider.emitMessage SourceFile:117
ua.naiksoftware.stomp.provider.OkHttpConnectionProvider$1.[onOpen | onMessage | onMessage | onFailure | onClosing] SourceFile:67
okhttp3.internal.ws.RealWebSocket.onReadMessage SourceFile:323
okhttp3.internal.ws.WebSocketReader.readMessageFrame SourceFile:219
okhttp3.internal.ws.WebSocketReader.processNextFrame SourceFile:105
okhttp3.internal.ws.RealWebSocket.loopReader SourceFile:274
okhttp3.internal.ws.RealWebSocket$2.onResponse SourceFile:214
okhttp3.RealCall$AsyncCall.execute SourceFile:206
okhttp3.internal.NamedRunnable.run SourceFile:32
java.util.concurrent.ThreadPoolExecutor.runWorker ThreadPoolExecutor.java:1167
java.util.concurrent.ThreadPoolExecutor$Worker.run ThreadPoolExecutor.java:641
java.lang.Thread.run Thread.java:764

joshuajwitter avatar Sep 17 '19 12:09 joshuajwitter

I am still scratching my head, I wish there was a working code example for using the subscribe() method. Reading the other issues on this project leads me to posts stating that you need to include an error handler, but in my code you can see that I not only call .doOnError() but I also override the callback method onError in the subscribe call. It's at the point where I am just trying to catch the crash in a debugger and work backwards to see what is missing.

joshuajwitter avatar Sep 17 '19 12:09 joshuajwitter

A bit more information from https://github.com/ReactiveX/RxJava/wiki/Error-Handling:

OnErrorNotImplementedException This indicates that an Observable tried to call its observer’s onError() method, but that no such method existed. You can eliminate this by either fixing the Observable so that it no longer reaches an error condition, by implementing an onError handler in the observer, or by intercepting the onError notification before it reaches the observer by using one of the operators described elsewhere on this page.

joshuajwitter avatar Sep 17 '19 13:09 joshuajwitter

Alright, I think I may have discovered what was going on. I believe I was focusing my efforts on the wrong code. I was looking into where my client was subscribing to a topic. I totally overlooked where my client was actually sending data via the websocket connection.

This is what my sending code looked like:

Disposable disposable = mStompClient.send(destination, data).subscribe();

I have changed it to:

Disposable disposable = mStompClient
    .send(destination, data)
    .subscribe(() -> Log.d(TAG, "Data sent successfully through websocket"), throwable -> {
        Log.e(TAG, "Error sending data using websocket", throwable);
    });

I should know in a few days if this fixes the error, it seems like it very well might. I will report back with my findings ASAP.

joshuajwitter avatar Sep 17 '19 13:09 joshuajwitter

If you're starting a new project, don't use this legacy library. Use tinder/scarlet.

forresthopkinsa avatar Sep 19 '19 08:09 forresthopkinsa

Yeah, it looks like my changes didn't fix the error. I am still seeing the same crashes.

joshuajwitter avatar Sep 24 '19 13:09 joshuajwitter

It works after I added this line.

    stompClient.setPathMatcher(ActivePathMatcher());

I had to make some changes to the library locally

codedentwickler avatar Sep 24 '19 13:09 codedentwickler

@codedentwickler could you elaborate?

It works after I added this line.

    stompClient.setPathMatcher(ActivePathMatcher());

I had to make some changes to the library locally

joshuajwitter avatar Sep 24 '19 18:09 joshuajwitter

It works after I added this line.

    stompClient.setPathMatcher(ActivePathMatcher());

I had to make some changes to the library locally

:trollface:

joshuajwitter avatar Sep 25 '19 15:09 joshuajwitter

我解决了这个OnErrorNotImplementedException的问题,直接把源码拉下来,把StompClient这个类中所有的subscribe方法中,都加入onError回调,结果正常输出日志,看到错误的源头在StompMessage类中的reader.skip("\n\n");这一行,我在这加上了try catch,整个stomp通信就正常了 try { reader.skip("\n\n"); }catch (NoSuchElementException e){ Log.e(TAG, "---果然是这里错了==="); }

liu-v avatar Oct 28 '19 10:10 liu-v

I have the same problem with it. Does anyone found the solution? ....

marain87 avatar Nov 04 '19 08:11 marain87

First, make sure that all your subscribers have an onError function supplied to the subscriber, or you're dead in the water.

Please, everyone, refrain from posting in this thread unless you can show us your code. If you want help, post your websocket code that's causing the error.

forresthopkinsa avatar Nov 05 '19 23:11 forresthopkinsa

I've solved the problem by added onBackpressureBuffer() method to the code where I`m subscribing to the lifecycle. Look at the code:

mStompClient?.lifecycle()
            ?.onBackpressureBuffer()
            ?.subscribeOn(Schedulers.io())
            ?.observeOn(AndroidSchedulers.mainThread())

Seems like you can use it in your topic listener. By the way, you can try to use another library for your sockets(I'm using it): implementation "com.github.forresthopkinsa:StompProtocolAndroid:$version"

morozione avatar Nov 26 '19 18:11 morozione

Keep in mind that my fork is not actively maintained anymore. While it should still work, I recommend moving away from this project entirely and migrating to the far-more-actively-maintained (and funded) websocket library Tinder/Scarlet

forresthopkinsa avatar Dec 05 '19 22:12 forresthopkinsa

I fix this, added all lib to my proyect and change the method called 'from' in StompMessage class with this

public static StompMessage from(@Nullable String data) {
      if (data == null || data.trim().isEmpty()) {
          return new StompMessage(StompCommand.UNKNOWN, null, data);
      }
      Scanner reader = new Scanner(new StringReader(data));
      reader.useDelimiter("\\n");
      String command = reader.next();
      List<StompHeader> headers = new ArrayList<>();

      while (reader.hasNext(PATTERN_HEADER)) {
          Matcher matcher = PATTERN_HEADER.matcher(reader.next());
          matcher.find();
          headers.add(new StompHeader(matcher.group(1), matcher.group(2)));
      }

      reader.skip("\\s");

      reader.useDelimiter(TERMINATE_MESSAGE_SYMBOL);
      String payload = reader.hasNext() ? reader.next() : null;

      return new StompMessage(command, headers, payload);
  }

javier4mar avatar Jan 17 '20 17:01 javier4mar

I am using the latest Spring boot 2 & RabbitMQ. @liu-v & @javier4mar Answer helped me.

Seem that the return of Stomp Message without the "legacyWhitespace" ("\n\n"), so remove or add try catch for the line reader.skip("\n\n"); in StompMessage can solve this issue.

marain87 avatar Feb 09 '20 12:02 marain87

Is anyone working on this? I am experiencing this with my app--tons of crashes.

veeva-mark avatar Jul 31 '20 17:07 veeva-mark

Off-topic. @forresthopkinsa you have mentioned in many places to use Scarlet. I have tried to configure it, the connection is upgraded and in the network profiler, I can see heartbeat being sent every n second. While this library works out of the box, Scarlet does not have any working example.

ltmilan avatar Sep 16 '20 05:09 ltmilan