okhttp icon indicating copy to clipboard operation
okhttp copied to clipboard

New API: WebSocket.flush() to await an empty queue

Open jumperchen opened this issue 7 years ago • 10 comments

If we send 1M messages in a loop, the websocket connection will close with the reason 1001 automatically. (it didn't happen on other Websocket clients, such as Browser or other Java Websocket client implementation, like Tyrus, NV-Websocket-Client)

Here is the gist to reproduce https://gist.github.com/jumperchen/6f8410b871a6d3746fe1967340d7a1a1

Version: 3.7.0 and 3.8.0-SNAPSHOT (master branch)

jumperchen avatar May 03 '17 07:05 jumperchen

I think that works as intended. https://github.com/square/okhttp/blob/e172706b56a8616cf70f8d8285d5f8701c8a36a0/okhttp/src/main/java/okhttp3/internal/ws/RealWebSocket.java#L371

  private synchronized boolean send(ByteString data, int formatOpcode) {
    // Don't send new frames after we've failed or enqueued a close frame.
    if (failed || enqueuedClose) return false;

    // If this frame overflows the buffer, reject it and close the web socket.
    if (queueSize + data.size() > MAX_QUEUE_SIZE) {
      close(CLOSE_CLIENT_GOING_AWAY, null);
      return false;
    }

b95505017 avatar May 03 '17 08:05 b95505017

Yup. If you're deliberately sending a very large number of messages, you have to implement your own back pressure to slow enqueueing down to not exhaust memory. You can check the queue size and sleep in this case.

swankjesse avatar May 03 '17 11:05 swankjesse

while (websocket.queueSize() > limit) {
  wait(500);
}
websocket.send(message);
}

swankjesse avatar May 03 '17 11:05 swankjesse

Is it possible to make this MAX_QUEUE_SIZE configurable? Or make this class be extendable. Otherwise, we have no chance to change it. (we don't want to copy the whole implementation) :)

Thanks.

jumperchen avatar May 04 '17 01:05 jumperchen

How much bigger do you want it?

Don’t think of the queue size as “the number of bytes you can send”, think of it as ”the amount of memory you’re holding” . . . we want a hard limit because there’s not much benefit to holding a very large queue of data in memory.

We need backpressure so that clients can slow down sends when the websocket isn’t keeping up. The loop above is my recommended option.

swankjesse avatar May 04 '17 01:05 swankjesse

Our client may have 128GB memory or more :) So if it can be configurable that would be better. By the way, we used socket.io Java client, which is based on okhttp. (we don't use okhttp directly) Another thing I found that okhttp's overhead seem to be bigger than nv-websocket-client (may be related to this issue https://github.com/square/okhttp/issues/1733) From my tested, okhttp is 2~3 times slower than nv-websocket-client, so we may consider to switch the socket.io Java client implementation to boost the performance.

jumperchen avatar May 04 '17 02:05 jumperchen

Can take a look at websocket compression. If you’d like to take a look and send a PR, that'll accelerate that!

Making the send buffer configurable is an option. I'm worried that it's the wrong way to fix the problem; if the websocket does fail it's unfortunate to have wasted energy enqueueing data that won't be sent.

What about an API like flush() that just sleeps until all currently-enqueued messages have been transmitted?

swankjesse avatar May 04 '17 08:05 swankjesse

Yes, the flush() API sounds better than the current implementation that close the connection unexpectedly.

jumperchen avatar May 04 '17 08:05 jumperchen

I would like to bump this issue as I have also encountered a situation were it is necessary to ensure that the websocket has fully flushed its buffers.

MaxwellDAssistek avatar Oct 13 '22 16:10 MaxwellDAssistek

Until we get it implemented, if you’d like to flush you can use this:

public void flush(WebSocket websocket) throws InterruptedException {
  long limit = 1024L * 1024 * 4; // Whatever limit you’d like
  while (websocket.queueSize() > limit) {
    wait(500);
  }
}

swankjesse avatar Jan 22 '23 14:01 swankjesse