okhttp-eventsource
okhttp-eventsource copied to clipboard
Fix connection leaks after closing non-exhaustive EventSource.
Affected version
4.1.1
Problem
Okhttp connection won't closed properly after closing EventSource when SSE stream is not fully consumed.
Reproduce code
package moe.nemesiss.playground;
import com.launchdarkly.eventsource.ConnectStrategy;
import com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.eventsource.MessageEvent;
import okhttp3.ConnectionPool;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import java.util.logging.Level;
import java.util.logging.Logger;
import static org.fest.reflect.core.Reflection.field;
import static org.fest.reflect.core.Reflection.method;
public class EventSourceLeakDemo {
private static HttpUrl streamGeneratorUrlFor(int length, int interval) {
// A self-hosted SSE stream generator.
return HttpUrl.get(String.format("https://plugins.nemesiss.xyz/main/stream_generator?length=%s&intervalMs=%s", length, interval));
}
public static void main(String[] args) throws InterruptedException {
EventSource closedEventSource;
Logger.getLogger(OkHttpClient.class.getName()).setLevel(Level.FINE);
try (EventSource source = new EventSource.Builder(
// Obtain a stream with 10 MessageEvents
ConnectStrategy.http(streamGeneratorUrlFor(10, 200))).build()) {
// Actually consumes 8 message and gone.
for (int i = 0; i < 8; i++) {
try {
MessageEvent message = source.readMessage();
System.out.println(message.getData());
} catch (Throwable t) {
t.printStackTrace();
break;
}
}
closedEventSource = source;
}
// Here EventSource was closed since leaving try-with-resource block.
// GC WeakReference hosted in connection.
// see: okhttp3.internal.connection.RealConnection.getCalls
System.gc();
// Trigger connection pool cleanup manually.
// these reflection calls are equivalent to statement:
// ((com.launchdarkly.eventsource.HttpConnectStrategy.Client) closedEventSource).client.connectionPool().delegate.cleanup(System.nanoTime());
ConnectStrategy.Client client = field("client").ofType(ConnectStrategy.Client.class).in(closedEventSource).get();
OkHttpClient internalOkHttpClient = field("httpClient").ofType(OkHttpClient.class).in(client).get();
ConnectionPool cp = internalOkHttpClient.connectionPool();
method("cleanup").withReturnType(long.class).withParameterTypes(long.class).in(cp.getDelegate$okhttp()).invoke(System.nanoTime());
// And connection leak warning should be shown in terminal like below:
// index: 0, timestamp: 1704600618813
// index: 1, timestamp: 1704600619016
// index: 2, timestamp: 1704600619220
// index: 3, timestamp: 1704600619424
// index: 4, timestamp: 1704600619628
// index: 5, timestamp: 1704600619833
// index: 6, timestamp: 1704600620037
// index: 7, timestamp: 1704600620240
//Jan 07, 2024 12:10:20 PM okhttp3.internal.platform.Platform log
//WARNING: A connection to https://plugins.nemesiss.xyz/ was leaked. Did you forget to close a response body?
//java.lang.Throwable: response.body().close()
// at okhttp3.internal.platform.Platform.getStackTraceForCloseable(Platform.kt:145)
// at okhttp3.internal.connection.RealCall.callStart(RealCall.kt:170)
// at okhttp3.internal.connection.RealCall.execute(RealCall.kt:151)
// at com.launchdarkly.eventsource.HttpConnectStrategy$Client.connect(HttpConnectStrategy.java:452)
// at com.launchdarkly.eventsource.EventSource.tryStart(EventSource.java:292)
// at com.launchdarkly.eventsource.EventSource.requireEvent(EventSource.java:595)
// at com.launchdarkly.eventsource.EventSource.readAnyEvent(EventSource.java:390)
// at com.launchdarkly.eventsource.EventSource.readMessage(EventSource.java:359)
// at moe.nemesiss.playground.EventSourceLeakDemo.main(EventSourceLeakDemo.java:33)
// Jan 07, 2024 12:10:20 PM okhttp3.internal.platform.Platform log
}
}
Reason
-
com.launchdarkly.eventsource.EventSource#close
will close underlyingokhttp3.Call
and package-privatecom.launchdarkly.eventsource.HttpConnectStrategy.Client#httpClient
if there is no specified httpClient from outside. -
Canceling a
okhttp3.Call
will:- close underlying Socket (for http1.1) or Stream (for http2) will call stack below:
java.lang.Thread.State: RUNNABLE at okhttp3.internal.connection.Exchange.cancel(Exchange.kt:153) at okhttp3.internal.connection.RealCall.cancel(RealCall.kt:139) at com.launchdarkly.eventsource.HttpConnectStrategy$RequestCloser.close(HttpConnectStrategy.java:554) at com.launchdarkly.eventsource.EventSource.closeCurrentStream(EventSource.java:677) - locked <0x916> (a java.lang.Object) at com.launchdarkly.eventsource.EventSource.close(EventSource.java:532)
-
Condition for logging connection leak warning: connection.calls is not empty, but referring 'RealCall' was gone by GC, indicating RealCall was not detached with RealConnection properly.
internal class CallReference( referent: RealCall, /** * Captures the stack trace at the time the Call is executed or enqueued. This is helpful for * identifying the origin of connection leaks. */ val callStackTrace: Any? ) : WeakReference<RealCall>(referent) private fun pruneAndGetAllocationCount(connection: RealConnection, now: Long): Int { connection.assertThreadHoldsLock() val references: MutableList<Reference<RealCall>> = connection.calls var i = 0 // connection.calls is not empty while (i < references.size) { val reference: Reference<RealCall> = references[i] if (reference.get() != null) { i++ continue } // But referring 'RealCall' was gone by GC. // Indicates RealCall was not detached with RealConnection properly. // We've discovered a leaked call. This is an application bug. val callReference = reference as CallReference val message = "A connection to ${connection.route().address.url} was leaked. " + "Did you forget to close a response body?" Platform.get().logCloseableLeak(message, callReference.callStackTrace) references.removeAt(i) connection.noNewExchanges = true // If this was the last allocation, the connection is eligible for immediate eviction. if (references.isEmpty()) { connection.idleAtNs = now - keepAliveDurationNs return 0 } } return references.size }
-
The only way to initiative detach RealCall with RealConnection properly is calling
releaseConnectionNoEvents
method, which entrance is inmessageDone
method.-
Called from stream exhausted:
"main@1" prio=5 tid=0x1 nid=NA runnable java.lang.Thread.State: RUNNABLE at okhttp3.internal.connection.RealCall.releaseConnectionNoEvents$okhttp(RealCall.kt:381) at okhttp3.internal.connection.RealCall.callDone(RealCall.kt:350) - locked <0x8b2> (a okhttp3.internal.connection.RealConnection) at okhttp3.internal.connection.RealCall.messageDone$okhttp(RealCall.kt:309) at okhttp3.internal.connection.Exchange.bodyComplete(Exchange.kt:193) at okhttp3.internal.connection.Exchange$ResponseBodySource.complete(Exchange.kt:324) at okhttp3.internal.connection.Exchange$ResponseBodySource.read(Exchange.kt:284) at okio.RealBufferedSource$inputStream$1.read(RealBufferedSource.kt:158) at com.launchdarkly.eventsource.BufferedLineParser.readMoreIntoBuffer(BufferedLineParser.java:138) at com.launchdarkly.eventsource.BufferedLineParser.read(BufferedLineParser.java:63) at com.launchdarkly.eventsource.EventParser.getNextChunk(EventParser.java:267) at com.launchdarkly.eventsource.EventParser.tryNextEvent(EventParser.java:130) at com.launchdarkly.eventsource.EventParser.nextEvent(EventParser.java:109) at com.launchdarkly.eventsource.EventSource.requireEvent(EventSource.java:600) at com.launchdarkly.eventsource.EventSource.readAnyEvent(EventSource.java:392) at com.launchdarkly.eventsource.EventSource.readMessage(EventSource.java:361) at moe.nemesiss.playground.EventSourceLeakDemo.main(EventSourceLeakDemo.java:34)
-
Called from closing Response.
"main@1" prio=5 tid=0x1 nid=NA runnable java.lang.Thread.State: RUNNABLE at okhttp3.internal.connection.RealCall.releaseConnectionNoEvents$okhttp(RealCall.kt:381) at okhttp3.internal.connection.RealCall.callDone(RealCall.kt:350) - locked <0x8b3> (a okhttp3.internal.connection.RealConnection) at okhttp3.internal.connection.RealCall.messageDone$okhttp(RealCall.kt:309) at okhttp3.internal.connection.Exchange.bodyComplete(Exchange.kt:193) at okhttp3.internal.connection.Exchange$ResponseBodySource.complete(Exchange.kt:324) at okhttp3.internal.connection.Exchange$ResponseBodySource.close(Exchange.kt:310) at okio.RealBufferedSource.close(RealBufferedSource.kt:392) at okhttp3.internal.Util.closeQuietly(Util.kt:495) at okhttp3.ResponseBody.close(ResponseBody.kt:192) at okhttp3.Response.close(Response.kt:302) at com.launchdarkly.eventsource.HttpConnectStrategy$ResponseCloser.close(HttpConnectStrategy.java:570) at com.launchdarkly.eventsource.EventSource.closeCurrentStream(EventSource.java:694) - locked <0x923> (a java.lang.Object) at com.launchdarkly.eventsource.EventSource.close(EventSource.java:534) at moe.nemesiss.playground.EventSourceLeakDemo.main(EventSourceLeakDemo.java:42)
-
-
But such method cannot be called either closing
Call
or closing connection pool. ClosingCall
is obvious, let's focus on closing conneciton pool.// com.launchdarkly.eventsource.HttpConnectStrategy.Client#close public void close() { // We need to shut down the HTTP client *if* it is one that we created, and not // one that the application provided to us. OkHttpClient preconfiguredClient = HttpConnectStrategy.this.httpClient; if (preconfiguredClient == null) { // COVERAGE: these null guards are here for safety but in practice the values are never null and there // is no way to cause them to be null in unit tests if (httpClient.connectionPool() != null) { httpClient.connectionPool().evictAll(); // evict calls from connection pool. } if (httpClient.dispatcher() != null) { httpClient.dispatcher().cancelAll(); if (httpClient.dispatcher().executorService() != null) { httpClient.dispatcher().executorService().shutdownNow(); } } } } // okhttp3.internal.connection.RealConnectionPool#evictAll fun evictAll() { val i = connections.iterator() while (i.hasNext()) { val connection = i.next() val socketToClose = synchronized(connection) { if (connection.calls.isEmpty()) { // false, closing 'Call' will not detach call with connection. i.remove() connection.noNewExchanges = true return@synchronized connection.socket() } else { return@synchronized null } } socketToClose?.closeQuietly() // close socket (equivalent to closing 'Call'). } if (connections.isEmpty()) cleanupQueue.cancelAll() }
Possible Fixup
Close underlying Response
in closeCurrentStream along with closing Call
at reading thread.
Hello and thank you for bringing this to our attention and opening this PR. We will review when able to do so. Thank you!