okhttp
okhttp copied to clipboard
New API: WebSocket.flush() to await an empty queue
If we send 1M messages in a loop, the websocket connection will close with the reason 1001 automatically. (it didn't happen on other Websocket clients, such as Browser or other Java Websocket client implementation, like Tyrus, NV-Websocket-Client)
Here is the gist to reproduce https://gist.github.com/jumperchen/6f8410b871a6d3746fe1967340d7a1a1
Version: 3.7.0 and 3.8.0-SNAPSHOT (master branch)
I think that works as intended. https://github.com/square/okhttp/blob/e172706b56a8616cf70f8d8285d5f8701c8a36a0/okhttp/src/main/java/okhttp3/internal/ws/RealWebSocket.java#L371
private synchronized boolean send(ByteString data, int formatOpcode) {
// Don't send new frames after we've failed or enqueued a close frame.
if (failed || enqueuedClose) return false;
// If this frame overflows the buffer, reject it and close the web socket.
if (queueSize + data.size() > MAX_QUEUE_SIZE) {
close(CLOSE_CLIENT_GOING_AWAY, null);
return false;
}
Yup. If you're deliberately sending a very large number of messages, you have to implement your own back pressure to slow enqueueing down to not exhaust memory. You can check the queue size and sleep in this case.
while (websocket.queueSize() > limit) {
wait(500);
}
websocket.send(message);
}
Is it possible to make this MAX_QUEUE_SIZE configurable? Or make this class be extendable. Otherwise, we have no chance to change it. (we don't want to copy the whole implementation) :)
Thanks.
How much bigger do you want it?
Don’t think of the queue size as “the number of bytes you can send”, think of it as ”the amount of memory you’re holding” . . . we want a hard limit because there’s not much benefit to holding a very large queue of data in memory.
We need backpressure so that clients can slow down sends when the websocket isn’t keeping up. The loop above is my recommended option.
Our client may have 128GB memory or more :) So if it can be configurable that would be better. By the way, we used socket.io Java client, which is based on okhttp. (we don't use okhttp directly) Another thing I found that okhttp's overhead seem to be bigger than nv-websocket-client (may be related to this issue https://github.com/square/okhttp/issues/1733) From my tested, okhttp is 2~3 times slower than nv-websocket-client, so we may consider to switch the socket.io Java client implementation to boost the performance.
Can take a look at websocket compression. If you’d like to take a look and send a PR, that'll accelerate that!
Making the send buffer configurable is an option. I'm worried that it's the wrong way to fix the problem; if the websocket does fail it's unfortunate to have wasted energy enqueueing data that won't be sent.
What about an API like flush()
that just sleeps until all currently-enqueued messages have been transmitted?
Yes, the flush()
API sounds better than the current implementation that close the connection unexpectedly.
I would like to bump this issue as I have also encountered a situation were it is necessary to ensure that the websocket has fully flushed its buffers.
Until we get it implemented, if you’d like to flush you can use this:
public void flush(WebSocket websocket) throws InterruptedException {
long limit = 1024L * 1024 * 4; // Whatever limit you’d like
while (websocket.queueSize() > limit) {
wait(500);
}
}