StompProtocolAndroid icon indicating copy to clipboard operation
StompProtocolAndroid copied to clipboard

Not connected yet

Open bauer-bao opened this issue 7 years ago • 11 comments

io.reactivex.exceptions.OnErrorNotImplementedException: Not connected yet at io.reactivex.internal.observers.EmptyCompletableObserver.onError(EmptyCompletableObserver.java:51) at io.reactivex.internal.operators.completable.CompletableFromCallable.subscribeActual(CompletableFromCallable.java:40) at io.reactivex.Completable.subscribe(Completable.java:2171) at io.reactivex.Completable.subscribe(Completable.java:2159) at ua.naiksoftware.stomp.client.StompClient.lambda$connect$0$StompClient(StompClient.java:121) at ua.naiksoftware.stomp.client.StompClient$$Lambda$0.accept(Unknown Source) at io.reactivex.internal.observers.LambdaObserver.onNext(LambdaObserver.java:63) at io.reactivex.subjects.PublishSubject$PublishDisposable.onNext(PublishSubject.java:308) at io.reactivex.subjects.PublishSubject.onNext(PublishSubject.java:228) at ua.naiksoftware.stomp.AbstractConnectionProvider.emitLifecycleEvent(AbstractConnectionProvider.java:110) at ua.naiksoftware.stomp.OkHttpConnectionProvider$1.onOpen(OkHttpConnectionProvider.java:61) at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:209) at okhttp3.RealCall$AsyncCall.execute(RealCall.java:153) at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) at java.lang.Thread.run(Thread.java:776) Caused by: java.lang.IllegalStateException: Not connected yet at ua.naiksoftware.stomp.AbstractConnectionProvider.lambda$send$0$AbstractConnectionProvider(AbstractConnectionProvider.java:75) at ua.naiksoftware.stomp.AbstractConnectionProvider$$Lambda$2.call(Unknown Source) at io.reactivex.internal.operators.completable.CompletableFromCallable.subscribeActual(CompletableFromCallable.java:36) at io.reactivex.Completable.subscribe(Completable.java:2171)  at io.reactivex.Completable.subscribe(Completable.java:2159)  at ua.naiksoftware.stomp.client.StompClient.lambda$connect$0$StompClient(StompClient.java:121)  at ua.naiksoftware.stomp.client.StompClient$$Lambda$0.accept(Unknown Source)  at io.reactivex.internal.observers.LambdaObserver.onNext(LambdaObserver.java:63)  at io.reactivex.subjects.PublishSubject$PublishDisposable.onNext(PublishSubject.java:308)  at io.reactivex.subjects.PublishSubject.onNext(PublishSubject.java:228)  at ua.naiksoftware.stomp.AbstractConnectionProvider.emitLifecycleEvent(AbstractConnectionProvider.java:110)  at ua.naiksoftware.stomp.OkHttpConnectionProvider$1.onOpen(OkHttpConnectionProvider.java:61)  at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:209)  at okhttp3.RealCall$AsyncCall.execute(RealCall.java:153)  at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)  at java.lang.Thread.run(Thread.java:776)  java.lang.IllegalStateException: Not connected yet at ua.naiksoftware.stomp.AbstractConnectionProvider.lambda$send$0$AbstractConnectionProvider(AbstractConnectionProvider.java:75) at ua.naiksoftware.stomp.AbstractConnectionProvider$$Lambda$2.call(Unknown Source) at io.reactivex.internal.operators.completable.CompletableFromCallable.subscribeActual(CompletableFromCallable.java:36) at io.reactivex.Completable.subscribe(Completable.java:2171) at io.reactivex.Completable.subscribe(Completable.java:2159) at ua.naiksoftware.stomp.client.StompClient.lambda$connect$0$StompClient(StompClient.java:121) at ua.naiksoftware.stomp.client.StompClient$$Lambda$0.accept(Unknown Source) at io.reactivex.internal.observers.LambdaObserver.onNext(LambdaObserver.java:63) at io.reactivex.subjects.PublishSubject$PublishDisposable.onNext(PublishSubject.java:308) at io.reactivex.subjects.PublishSubject.onNext(PublishSubject.java:228) at ua.naiksoftware.stomp.AbstractConnectionProvider.emitLifecycleEvent(AbstractConnectionProvider.java:110) at ua.naiksoftware.stomp.OkHttpConnectionProvider$1.onOpen(OkHttpConnectionProvider.java:61) at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:209) at okhttp3.RealCall$AsyncCall.execute(RealCall.java:153) at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) at java.lang.Thread.run(Thread.java:776)

App crash when sending message after Emit lifecycle event: OPENED

bauer-bao avatar Oct 24 '18 10:10 bauer-bao

Having same issue

BilalSiddiqui avatar Feb 15 '19 11:02 BilalSiddiqui

Do you have onError implemented on your websocket stream? That's not the problem, but that's why you're getting this ugly stack trace.

The problem is that you're sending messages before you have an open websocket.

forresthopkinsa avatar Feb 15 '19 17:02 forresthopkinsa

Not necessary wait for connection, library will do this instead you:)

NaikSoftware avatar Feb 15 '19 17:02 NaikSoftware

If you have same issue, please post snippet for reproduce or write more info..

NaikSoftware avatar Feb 15 '19 17:02 NaikSoftware

For the time being this crash is occurring random because i am unable to find the reason behind. Sometime code is working perfectly and sometimes crashes repeatedly.

 mStompClient = Stomp.over(Stomp.ConnectionProvider.OKHTTP, "wss://api." + serverUrl + "/mp/ws/websocket");
        resetSubscriptions();
        Disposable dispLifecycle = mStompClient.lifecycle()
                .subscribeOn(Schedulers.io())
                .doOnError(error -> Log.d(TAG, "The error message is: " + error.getMessage()))
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(lifecycleEvent -> {
                    switch (lifecycleEvent.getType()) {
                        case OPENED:
                            Log.i(TAG, "Stomp connection opened");
                            break;
                        case ERROR:
                            Log.e(TAG, "Stomp connection error", lifecycleEvent.getException());
                            break;
                        case CLOSED:
                            Log.e(TAG, "Stomp connection closed");
                            resetSubscriptions();
                            connectScoket();
                            break;
                        case FAILED_SERVER_HEARTBEAT:
                            Log.e(TAG, "Stomp failed server heartbeat");
                            break;
                    }
                }, throwable -> {
                    Log.e(TAG, "Error on subscribe topic", throwable);
                });
        compositeDisposable.add(dispLifecycle);
        subscribeDirectMessages();
        mStompClient.connect();


    private void subscribeDirectMessages() {
        if (DataHolder.getInstance().getDirectChannels() != null && !DataHolder.getInstance().getDirectChannels().isEmpty()) {
            int size = DataHolder.getInstance().getDirectChannels().size();
            for (int i = 0; i < size; i++) {
//                if (DataHolder.getInstance().getDirectChannels().get(i) instanceof ParticipantsMeeting) {
                Disposable dispTopic = mStompClient.topic("/exchange/amq.direct/direct-discussionchannels." + DataHolder.getInstance().getDirectChannels().get(i).id)
                        .subscribeOn(Schedulers.io())
                        .doOnError(error -> Log.d(TAG, "The error message is: " + error.getMessage()))
                        .onErrorReturn(error ->
                                StompMessage.from(error.getMessage())
                        )
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(topicMessage -> {
                            Log.d(TAG, "Received " + topicMessage.getPayload());
                            handleDirectMessage(topicMessage);
                        }, error -> Log.d(TAG, "onError should not be printed!"));
                compositeDisposable.add(dispTopic);

            }
        }

    }



BilalSiddiqui avatar Feb 16 '19 12:02 BilalSiddiqui

@BilalSiddiqui so do I. My crash log is from third log statistics platform.

bauer-bao avatar Feb 21 '19 02:02 bauer-bao

@NaikSoftware Is there anything we can do to avoid this error.

BilalSiddiqui avatar Feb 21 '19 10:02 BilalSiddiqui

Looks like a race condition: the openSocket field is nullified in OkHttpConnectionProvider.createWebSocketConnection() on another thread, so AbstractConnectionProvider.send(String stompMessage) finds it null (in my case when StompCommand.CONNECT is sent).

manfcas avatar Jul 05 '19 10:07 manfcas

Hello i am facing the same issue on a small percentage of my client app.

Same stack trace : java.lang.IllegalStateException: Not connected yet at ua.naiksoftware.stomp.AbstractConnectionProvider.lambda$send$0$AbstractConnectionProvider()(AbstractConnectionProvider.java:77) at ua.naiksoftware.stomp.-$$Lambda$AbstractConnectionProvider$i_moq_Q2vigeLagkjZBHpwYcZXU.call()(undefined:4) at io.reactivex.internal.operators.completable.CompletableFromCallable.subscribeActual()(CompletableFromCallable.java:35) at io.reactivex.Completable.subscribe()(Completable.java:1517) at io.reactivex.Completable.subscribe()(Completable.java:1505) at ua.naiksoftware.stomp.client.StompClient.lambda$connect$0$StompClient()(StompClient.java:121) at ua.naiksoftware.stomp.client.-$$Lambda$StompClient$mOosLpglfnf-qRWLaT0yM_6Q4ME.accept()(undefined:6) at io.reactivex.internal.observers.LambdaObserver.onNext()(LambdaObserver.java:59) at io.reactivex.subjects.PublishSubject$PublishDisposable.onNext()(PublishSubject.java:263) at io.reactivex.subjects.PublishSubject.onNext()(PublishSubject.java:182) at ua.naiksoftware.stomp.AbstractConnectionProvider.emitLifecycleEvent()(AbstractConnectionProvider.java:112) at ua.naiksoftware.stomp.OkHttpConnectionProvider$1.onOpen()(OkHttpConnectionProvider.java:61) at okhttp3.internal.ws.RealWebSocket$connect$1.onResponse()(RealWebSocket.kt:194) at okhttp3.internal.connection.RealCall$AsyncCall.run()(RealCall.kt:519) at java.util.concurrent.ThreadPoolExecutor.runWorker()(ThreadPoolExecutor.java:1167) at java.util.concurrent.ThreadPoolExecutor$Worker.run()(ThreadPoolExecutor.java:641) at java.lang.Thread.run()(Thread.java:923)

Gosunet avatar Feb 24 '21 15:02 Gosunet

The issue can be mitigated by replacing the following line in this library's source code, which I have copied from issue #144:

AbstractConnectionProvider.java, line 77

Replace throw new IllegalStateException("Not connected");

With return Completable.error(new IllegalStateException("Not connected"));

This solved the issue with my app crashing, but note that it does NOT solve the actual problem, there is of course a reason why your socket is null.

Nielssg avatar Jul 09 '23 11:07 Nielssg