StompProtocolAndroid icon indicating copy to clipboard operation
StompProtocolAndroid copied to clipboard

Payload that is to big won't be shown

Open vanlooverenkoen opened this issue 7 years ago • 7 comments

When the payload is to big, the callback won't be fired. Can this be fixed?

vanlooverenkoen avatar Mar 22 '17 15:03 vanlooverenkoen

Any update for this? payload wont show up when the content size is about 8000+ big

williamtan89 avatar May 22 '17 09:05 williamtan89

any updates??

kaltindas avatar Jul 26 '17 03:07 kaltindas

If anyone is still having this problem, please post details about it here. This issue is not being ignored; it just doesn't give a lot to go on.

forresthopkinsa avatar Sep 29 '17 23:09 forresthopkinsa

@NaikSoftware can you check this once? this issue is fatal.

AndrosEt avatar Jan 03 '18 01:01 AndrosEt

@Andros1993 You have to give more information about the issue. "This issue is fatal" doesn't help anything.

forresthopkinsa avatar Jan 03 '18 03:01 forresthopkinsa

@forresthopkinsa i checked the code, and i found some issues. If the data size bigger than 8k, it will split the data and call onWebsocketMessageFragment this callback, so if you want to get the data, need to override this callback method and merge the data together. image `package ua.naiksoftware.stomp;

import android.util.Log;

import org.java_websocket.WebSocket; import org.java_websocket.client.WebSocketClient; import org.java_websocket.drafts.Draft; import org.java_websocket.drafts.Draft_10; import org.java_websocket.drafts.Draft_17; import org.java_websocket.drafts.Draft_75; import org.java_websocket.drafts.Draft_76; import org.java_websocket.exceptions.InvalidDataException; import org.java_websocket.framing.Framedata; import org.java_websocket.handshake.ClientHandshake; import org.java_websocket.handshake.ServerHandshake; import org.java_websocket.util.Charsetfunctions;

import java.net.URI; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap;

import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory;

import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import io.reactivex.FlowableEmitter;

/**

  • Created by naik on 05.05.16. / / package */ class WebSocketsConnectionProvider implements ConnectionProvider {

    private static final String TAG = WebSocketsConnectionProvider.class.getSimpleName();

    private final String mUri; private final Map<String, String> mConnectHttpHeaders;

    private final List<FlowableEmitter<? super LifecycleEvent>> mLifecycleEmitters; private final List<FlowableEmitter<? super String>> mMessagesEmitters;

    private WebSocketClient mWebSocketClient; private boolean haveConnection; private TreeMap<String, String> mServerHandshakeHeaders;

    private final Object mLifecycleLock = new Object();

    /**

    • Support UIR scheme ws://host:port/path
    • @param connectHttpHeaders may be null / / package */ WebSocketsConnectionProvider(String uri, Map<String, String> connectHttpHeaders) { mUri = uri; mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>(); mLifecycleEmitters = new ArrayList<>(); mMessagesEmitters = new ArrayList<>(); }

    @Override public Flowable<String> messages() { Flowable<String> flowable = Flowable.<String>create(mMessagesEmitters::add, BackpressureStrategy.BUFFER) .doOnCancel(() -> { Iterator<FlowableEmitter<? super String>> iterator = mMessagesEmitters.iterator(); while (iterator.hasNext()) { if (iterator.next().isCancelled()) iterator.remove(); }

                 if (mMessagesEmitters.size() < 1) {
                     Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
                     mWebSocketClient.close();
                 }
             });
     createWebSocketConnection();
     return flowable;
    

    }

    private void createWebSocketConnection() { if (haveConnection) throw new IllegalStateException("Already have connection to web socket");

     mWebSocketClient = new WebSocketClient(URI.create(mUri), new Draft_17(), mConnectHttpHeaders, 0) {
    
    
    
         @Override
         public void onWebsocketHandshakeReceivedAsClient(WebSocket conn, ClientHandshake request, ServerHandshake response) throws InvalidDataException {
             Log.d(TAG, "onWebsocketHandshakeReceivedAsClient with response: " + response.getHttpStatus() + " " + response.getHttpStatusMessage());
             mServerHandshakeHeaders = new TreeMap<>();
             Iterator<String> keys = response.iterateHttpFields();
             while (keys.hasNext()) {
                 String key = keys.next();
                 mServerHandshakeHeaders.put(key, response.getFieldValue(key));
             }
         }
    
         @Override
         public void onOpen(ServerHandshake handshakeData) {
             Log.d(TAG, "onOpen with handshakeData: " + handshakeData.getHttpStatus() + " " + handshakeData.getHttpStatusMessage());
             LifecycleEvent openEvent = new LifecycleEvent(LifecycleEvent.Type.OPENED);
             openEvent.setHandshakeResponseHeaders(mServerHandshakeHeaders);
             emitLifecycleEvent(openEvent);
         }
    
         @Override
         public void onMessage(String message) {
             Log.d(TAG, "onMessage: " + message);
             emitMessage(message);
         }
    
         @Override
         public void onClose(int code, String reason, boolean remote) {
             Log.d(TAG, "onClose: code=" + code + " reason=" + reason + " remote=" + remote);
             haveConnection = false;
             emitLifecycleEvent(new LifecycleEvent(LifecycleEvent.Type.CLOSED));
         }
    
         @Override
         public void onError(Exception ex) {
             Log.e(TAG, "onError", ex);
             emitLifecycleEvent(new LifecycleEvent(LifecycleEvent.Type.ERROR, ex));
         }
    
         @Override
         public void onFragment(Framedata framedata) {
             Log.d("TEST9", framedata.toString());
         }
    
         private StringBuffer messBuf = new StringBuffer();
         @Override
         public void onWebsocketMessageFragment(WebSocket webSocket, Framedata framedata) {
             try {
                 String s = Charsetfunctions.stringUtf8(framedata.getPayloadData());
                 messBuf.append(s);
                 if (s.contains("\u0000")) {
                     emitMessage(messBuf.toString());
                     messBuf.setLength(0);
                 }
             } catch (InvalidDataException e) {
                 e.printStackTrace();
             }
         }
     };
    
     if (mUri.startsWith("wss")) {
         try {
             SSLContext sc = SSLContext.getInstance("TLS");
             sc.init(null, null, null);
             SSLSocketFactory factory = sc.getSocketFactory();
             mWebSocketClient.setSocket(factory.createSocket());
         } catch (Exception e) {
             e.printStackTrace();
         }
     }
    
     mWebSocketClient.connect();
     haveConnection = true;
    

    }

    @Override public Flowable<Void> send(String stompMessage) { return Flowable.create(emitter -> { if (mWebSocketClient == null) { emitter.onError(new IllegalStateException("Not connected yet")); } else { Log.d(TAG, "Send STOMP message: " + stompMessage); mWebSocketClient.send(stompMessage); emitter.onComplete(); } }, BackpressureStrategy.BUFFER); }

    private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) { synchronized (mLifecycleLock) { Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name()); for (FlowableEmitter<? super LifecycleEvent> emitter : mLifecycleEmitters) { emitter.onNext(lifecycleEvent); } } }

    private void emitMessage(String stompMessage) { // Log.d(TAG, "Emit STOMP message: " + stompMessage); for (FlowableEmitter<? super String> emitter : mMessagesEmitters) { emitter.onNext(stompMessage); } }

    @Override public Flowable<LifecycleEvent> getLifecycleReceiver() { return Flowable.<LifecycleEvent>create(mLifecycleEmitters::add, BackpressureStrategy.BUFFER) .doOnCancel(() -> { synchronized (mLifecycleLock) { Iterator<FlowableEmitter<? super LifecycleEvent>> iterator = mLifecycleEmitters.iterator(); while (iterator.hasNext()) { if (iterator.next().isCancelled()) iterator.remove(); } } }); } } `

AndrosEt avatar Jan 04 '18 10:01 AndrosEt

@Andros1993 Test your changes and submit a PR then

forresthopkinsa avatar Jan 04 '18 21:01 forresthopkinsa