StompProtocolAndroid
StompProtocolAndroid copied to clipboard
OnErrorNotImplementedException in StompClient.java
First off, thank you to everyone who works on this awesome project it really works well and is extremely useful.
I may have found a bug. Hopefully this has not been posted about and answered already, I tried to read all of the existing Issues and didn't see this exact issue.
First, some background: I have seen the OnErrorNotImplementedException
like other users:
io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.util.NoSuchElementException
...and after reading through the other issues posted and Stack Overflow I have fixed it by adding an error handler to my code that calls subscribe()
, like so:
// begin topic subscription for user data
stompClient.topic(TOPIC_NAME_HERE)
.doOnError(throwable -> {
// log the error and tell the service to resubscribe
})
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<StompMessage>() {
@Override
public void onSubscribe(Subscription subscription) {
}
@Override
public void onNext(StompMessage topicMessage) {
// handle the incoming message
}
@Override
public void onError(Throwable error) {
Log.e(TAG, "Error on user data topic", error);
resubscribe = true;
}
@Override
public void onComplete() {
}
});
This made the crashes extremely rare. With the help of the debugger I found that crashes would still happen occasionally when using the StompClient
to use heartbeat messages inside the StompClient
's connect method. In that method the subscribe
call is made but no error handler is specified. When I initially connect the StompClient
and setup heartbeating (via stompClient.withClientHeartbeat(1000).withServerHeartbeat(1000);
) I see the error occur after I call stompClient.disconnect()
. I think this is because there is an error that is not handled.
I might have totally missed something.. If this is a legit issue would be happy to add the error handlers to the StompClient connect() method code and create a PR if it helps.
I have seen this issue recently too. Have you fixed it on your end, if so, how?
@codedentwickler I am still seeing a large number of crashes even after trying to fix this error myself. This is what the stack traces for the exceptions look like:
java.util.Scanner.skip Scanner.java:1749
java.util.Scanner.skip Scanner.java:1766
ua.naiksoftware.stomp.dto.StompMessage.from SourceFile:89
ua.naiksoftware.stomp.-$$Lambda$S9OD0HNfof0SoPkv-GiHdisPzBw.apply
io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext SourceFile:57
io.reactivex.internal.operators.observable.ObservableConcatMap$ConcatMapDelayErrorObserver$DelayErrorInnerObserver.onNext SourceFile:506
io.reactivex.subjects.PublishSubject$PublishDisposable.onNext SourceFile:308
io.reactivex.subjects.PublishSubject.onNext SourceFile:228
ua.naiksoftware.stomp.provider.AbstractConnectionProvider.emitMessage SourceFile:117
ua.naiksoftware.stomp.provider.OkHttpConnectionProvider$1.[onOpen | onMessage | onMessage | onFailure | onClosing] SourceFile:67
okhttp3.internal.ws.RealWebSocket.onReadMessage SourceFile:323
okhttp3.internal.ws.WebSocketReader.readMessageFrame SourceFile:219
okhttp3.internal.ws.WebSocketReader.processNextFrame SourceFile:105
okhttp3.internal.ws.RealWebSocket.loopReader SourceFile:274
okhttp3.internal.ws.RealWebSocket$2.onResponse SourceFile:214
okhttp3.RealCall$AsyncCall.execute SourceFile:206
okhttp3.internal.NamedRunnable.run SourceFile:32
java.util.concurrent.ThreadPoolExecutor.runWorker ThreadPoolExecutor.java:1167
java.util.concurrent.ThreadPoolExecutor$Worker.run ThreadPoolExecutor.java:641
java.lang.Thread.run Thread.java:764
io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept SourceFile:704
io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept SourceFile:701
io.reactivex.internal.observers.LambdaObserver.onError SourceFile:77
io.reactivex.internal.observers.BasicFuseableObserver.onError SourceFile:100
io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError SourceFile:117
io.reactivex.internal.observers.BasicFuseableObserver.onError SourceFile:100
io.reactivex.internal.observers.BasicFuseableObserver.onError SourceFile:100
io.reactivex.internal.observers.BasicFuseableObserver.fail SourceFile:110
io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext SourceFile:59
io.reactivex.internal.operators.observable.ObservableConcatMap$ConcatMapDelayErrorObserver$DelayErrorInnerObserver.onNext SourceFile:506
io.reactivex.subjects.PublishSubject$PublishDisposable.onNext SourceFile:308
io.reactivex.subjects.PublishSubject.onNext SourceFile:228
ua.naiksoftware.stomp.provider.AbstractConnectionProvider.emitMessage SourceFile:117
ua.naiksoftware.stomp.provider.OkHttpConnectionProvider$1.[onOpen | onMessage | onMessage | onFailure | onClosing] SourceFile:67
okhttp3.internal.ws.RealWebSocket.onReadMessage SourceFile:323
okhttp3.internal.ws.WebSocketReader.readMessageFrame SourceFile:219
okhttp3.internal.ws.WebSocketReader.processNextFrame SourceFile:105
okhttp3.internal.ws.RealWebSocket.loopReader SourceFile:274
okhttp3.internal.ws.RealWebSocket$2.onResponse SourceFile:214
okhttp3.RealCall$AsyncCall.execute SourceFile:206
okhttp3.internal.NamedRunnable.run SourceFile:32
java.util.concurrent.ThreadPoolExecutor.runWorker ThreadPoolExecutor.java:1167
java.util.concurrent.ThreadPoolExecutor$Worker.run ThreadPoolExecutor.java:641
java.lang.Thread.run Thread.java:764
I am still scratching my head, I wish there was a working code example for using the subscribe()
method. Reading the other issues on this project leads me to posts stating that you need to include an error handler, but in my code you can see that I not only call .doOnError()
but I also override the callback method onError
in the subscribe call. It's at the point where I am just trying to catch the crash in a debugger and work backwards to see what is missing.
A bit more information from https://github.com/ReactiveX/RxJava/wiki/Error-Handling:
OnErrorNotImplementedException This indicates that an Observable tried to call its observer’s onError() method, but that no such method existed. You can eliminate this by either fixing the Observable so that it no longer reaches an error condition, by implementing an onError handler in the observer, or by intercepting the onError notification before it reaches the observer by using one of the operators described elsewhere on this page.
Alright, I think I may have discovered what was going on. I believe I was focusing my efforts on the wrong code. I was looking into where my client was subscribing to a topic. I totally overlooked where my client was actually sending data via the websocket connection.
This is what my sending code looked like:
Disposable disposable = mStompClient.send(destination, data).subscribe();
I have changed it to:
Disposable disposable = mStompClient
.send(destination, data)
.subscribe(() -> Log.d(TAG, "Data sent successfully through websocket"), throwable -> {
Log.e(TAG, "Error sending data using websocket", throwable);
});
I should know in a few days if this fixes the error, it seems like it very well might. I will report back with my findings ASAP.
If you're starting a new project, don't use this legacy library. Use tinder/scarlet.
Yeah, it looks like my changes didn't fix the error. I am still seeing the same crashes.
It works after I added this line.
stompClient.setPathMatcher(ActivePathMatcher());
I had to make some changes to the library locally
@codedentwickler could you elaborate?
It works after I added this line.
stompClient.setPathMatcher(ActivePathMatcher());
I had to make some changes to the library locally
It works after I added this line.
stompClient.setPathMatcher(ActivePathMatcher());
I had to make some changes to the library locally
:trollface:
我解决了这个OnErrorNotImplementedException的问题,直接把源码拉下来,把StompClient这个类中所有的subscribe方法中,都加入onError回调,结果正常输出日志,看到错误的源头在StompMessage类中的reader.skip("\n\n");这一行,我在这加上了try catch,整个stomp通信就正常了 try { reader.skip("\n\n"); }catch (NoSuchElementException e){ Log.e(TAG, "---果然是这里错了==="); }
I have the same problem with it. Does anyone found the solution? ....
First, make sure that all your subscribers have an onError
function supplied to the subscriber, or you're dead in the water.
Please, everyone, refrain from posting in this thread unless you can show us your code. If you want help, post your websocket code that's causing the error.
I've solved the problem by added onBackpressureBuffer() method to the code where I`m subscribing to the lifecycle. Look at the code:
mStompClient?.lifecycle()
?.onBackpressureBuffer()
?.subscribeOn(Schedulers.io())
?.observeOn(AndroidSchedulers.mainThread())
Seems like you can use it in your topic listener. By the way, you can try to use another library for your sockets(I'm using it): implementation "com.github.forresthopkinsa:StompProtocolAndroid:$version"
Keep in mind that my fork is not actively maintained anymore. While it should still work, I recommend moving away from this project entirely and migrating to the far-more-actively-maintained (and funded) websocket library Tinder/Scarlet
I fix this, added all lib to my proyect and change the method called 'from' in StompMessage class with this
public static StompMessage from(@Nullable String data) {
if (data == null || data.trim().isEmpty()) {
return new StompMessage(StompCommand.UNKNOWN, null, data);
}
Scanner reader = new Scanner(new StringReader(data));
reader.useDelimiter("\\n");
String command = reader.next();
List<StompHeader> headers = new ArrayList<>();
while (reader.hasNext(PATTERN_HEADER)) {
Matcher matcher = PATTERN_HEADER.matcher(reader.next());
matcher.find();
headers.add(new StompHeader(matcher.group(1), matcher.group(2)));
}
reader.skip("\\s");
reader.useDelimiter(TERMINATE_MESSAGE_SYMBOL);
String payload = reader.hasNext() ? reader.next() : null;
return new StompMessage(command, headers, payload);
}
I am using the latest Spring boot 2 & RabbitMQ. @liu-v & @javier4mar Answer helped me.
Seem that the return of Stomp Message without the "legacyWhitespace" ("\n\n"),
so remove or add try catch for the line
reader.skip("\n\n");
in StompMessage can solve this issue.
Is anyone working on this? I am experiencing this with my app--tons of crashes.
Off-topic. @forresthopkinsa you have mentioned in many places to use Scarlet. I have tried to configure it, the connection is upgraded and in the network profiler, I can see heartbeat being sent every n second. While this library works out of the box, Scarlet does not have any working example.