okhttp-eventsource icon indicating copy to clipboard operation
okhttp-eventsource copied to clipboard

Fix connection leaks after closing non-exhaustive EventSource.

Open LinZong opened this issue 1 year ago • 1 comments

Affected version

4.1.1

Problem

Okhttp connection won't closed properly after closing EventSource when SSE stream is not fully consumed.

Reproduce code

package moe.nemesiss.playground;

import com.launchdarkly.eventsource.ConnectStrategy;
import com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.eventsource.MessageEvent;
import okhttp3.ConnectionPool;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;

import java.util.logging.Level;
import java.util.logging.Logger;

import static org.fest.reflect.core.Reflection.field;
import static org.fest.reflect.core.Reflection.method;

public class EventSourceLeakDemo {

    private static HttpUrl streamGeneratorUrlFor(int length, int interval) {
        // A self-hosted SSE stream generator.
        return HttpUrl.get(String.format("https://plugins.nemesiss.xyz/main/stream_generator?length=%s&intervalMs=%s", length, interval));
    }

    public static void main(String[] args) throws InterruptedException {
        EventSource closedEventSource;
        Logger.getLogger(OkHttpClient.class.getName()).setLevel(Level.FINE);

        try (EventSource source = new EventSource.Builder(
                // Obtain a stream with 10 MessageEvents
                ConnectStrategy.http(streamGeneratorUrlFor(10, 200))).build()) {

            // Actually consumes 8 message and gone.
            for (int i = 0; i < 8; i++) {
                try {
                    MessageEvent message = source.readMessage();
                    System.out.println(message.getData());
                } catch (Throwable t) {
                    t.printStackTrace();
                    break;
                }
            }
            closedEventSource = source;
        }

        // Here EventSource was closed since leaving try-with-resource block.

        // GC WeakReference hosted in connection.
        // see: okhttp3.internal.connection.RealConnection.getCalls
        System.gc();

        // Trigger connection pool cleanup manually.
        // these reflection calls are equivalent to statement:
        // ((com.launchdarkly.eventsource.HttpConnectStrategy.Client) closedEventSource).client.connectionPool().delegate.cleanup(System.nanoTime());
        ConnectStrategy.Client client = field("client").ofType(ConnectStrategy.Client.class).in(closedEventSource).get();
        OkHttpClient internalOkHttpClient = field("httpClient").ofType(OkHttpClient.class).in(client).get();
        ConnectionPool cp = internalOkHttpClient.connectionPool();
        method("cleanup").withReturnType(long.class).withParameterTypes(long.class).in(cp.getDelegate$okhttp()).invoke(System.nanoTime());
        // And connection leak warning should be shown in terminal like below:
        // index: 0, timestamp: 1704600618813
        // index: 1, timestamp: 1704600619016
        // index: 2, timestamp: 1704600619220
        // index: 3, timestamp: 1704600619424
        // index: 4, timestamp: 1704600619628
        // index: 5, timestamp: 1704600619833
        // index: 6, timestamp: 1704600620037
        // index: 7, timestamp: 1704600620240
        //Jan 07, 2024 12:10:20 PM okhttp3.internal.platform.Platform log
        //WARNING: A connection to https://plugins.nemesiss.xyz/ was leaked. Did you forget to close a response body?
        //java.lang.Throwable: response.body().close()
        //	at okhttp3.internal.platform.Platform.getStackTraceForCloseable(Platform.kt:145)
        //	at okhttp3.internal.connection.RealCall.callStart(RealCall.kt:170)
        //	at okhttp3.internal.connection.RealCall.execute(RealCall.kt:151)
        //	at com.launchdarkly.eventsource.HttpConnectStrategy$Client.connect(HttpConnectStrategy.java:452)
        //	at com.launchdarkly.eventsource.EventSource.tryStart(EventSource.java:292)
        //	at com.launchdarkly.eventsource.EventSource.requireEvent(EventSource.java:595)
        //	at com.launchdarkly.eventsource.EventSource.readAnyEvent(EventSource.java:390)
        //	at com.launchdarkly.eventsource.EventSource.readMessage(EventSource.java:359)
        //	at moe.nemesiss.playground.EventSourceLeakDemo.main(EventSourceLeakDemo.java:33)
        // Jan 07, 2024 12:10:20 PM okhttp3.internal.platform.Platform log
    }
}

Reason

  1. com.launchdarkly.eventsource.EventSource#close will close underlying okhttp3.Call and package-private com.launchdarkly.eventsource.HttpConnectStrategy.Client#httpClient if there is no specified httpClient from outside.

  2. Canceling a okhttp3.Call will:

    • close underlying Socket (for http1.1) or Stream (for http2) will call stack below:
      java.lang.Thread.State: RUNNABLE
    	  at okhttp3.internal.connection.Exchange.cancel(Exchange.kt:153)
    	  at okhttp3.internal.connection.RealCall.cancel(RealCall.kt:139)
    	  at com.launchdarkly.eventsource.HttpConnectStrategy$RequestCloser.close(HttpConnectStrategy.java:554)
    	  at com.launchdarkly.eventsource.EventSource.closeCurrentStream(EventSource.java:677)
    	  - locked <0x916> (a java.lang.Object)
    	  at com.launchdarkly.eventsource.EventSource.close(EventSource.java:532)
    
  3. Condition for logging connection leak warning: connection.calls is not empty, but referring 'RealCall' was gone by GC, indicating RealCall was not detached with RealConnection properly.

    internal class CallReference(
        referent: RealCall,
        /**
         * Captures the stack trace at the time the Call is executed or enqueued. This is helpful for
         * identifying the origin of connection leaks.
         */
        val callStackTrace: Any?
      ) : WeakReference<RealCall>(referent)
    
    private fun pruneAndGetAllocationCount(connection: RealConnection, now: Long): Int {
        connection.assertThreadHoldsLock()
    
        val references: MutableList<Reference<RealCall>> = connection.calls
        var i = 0
        // connection.calls is not empty
        while (i < references.size) {
          val reference: Reference<RealCall> = references[i]
    
          if (reference.get() != null) {
            i++
            continue
          }
          // But referring 'RealCall' was gone by GC.
          // Indicates RealCall was not detached with RealConnection properly.
    
          // We've discovered a leaked call. This is an application bug.
          val callReference = reference as CallReference
          val message = "A connection to ${connection.route().address.url} was leaked. " +
              "Did you forget to close a response body?"
          Platform.get().logCloseableLeak(message, callReference.callStackTrace)
    
          references.removeAt(i)
          connection.noNewExchanges = true
    
          // If this was the last allocation, the connection is eligible for immediate eviction.
          if (references.isEmpty()) {
            connection.idleAtNs = now - keepAliveDurationNs
            return 0
          }
        }
    
        return references.size
      }
    
  4. The only way to initiative detach RealCall with RealConnection properly is calling releaseConnectionNoEvents method, which entrance is in messageDone method.

    • Called from stream exhausted:

      "main@1" prio=5 tid=0x1 nid=NA runnable
        java.lang.Thread.State: RUNNABLE
      	  at okhttp3.internal.connection.RealCall.releaseConnectionNoEvents$okhttp(RealCall.kt:381)
      	  at okhttp3.internal.connection.RealCall.callDone(RealCall.kt:350)
      	  - locked <0x8b2> (a okhttp3.internal.connection.RealConnection)
      	  at okhttp3.internal.connection.RealCall.messageDone$okhttp(RealCall.kt:309)
      	  at okhttp3.internal.connection.Exchange.bodyComplete(Exchange.kt:193)
      	  at okhttp3.internal.connection.Exchange$ResponseBodySource.complete(Exchange.kt:324)
      	  at okhttp3.internal.connection.Exchange$ResponseBodySource.read(Exchange.kt:284)
      	  at okio.RealBufferedSource$inputStream$1.read(RealBufferedSource.kt:158)
      	  at com.launchdarkly.eventsource.BufferedLineParser.readMoreIntoBuffer(BufferedLineParser.java:138)
      	  at com.launchdarkly.eventsource.BufferedLineParser.read(BufferedLineParser.java:63)
      	  at com.launchdarkly.eventsource.EventParser.getNextChunk(EventParser.java:267)
      	  at com.launchdarkly.eventsource.EventParser.tryNextEvent(EventParser.java:130)
      	  at com.launchdarkly.eventsource.EventParser.nextEvent(EventParser.java:109)
      	  at com.launchdarkly.eventsource.EventSource.requireEvent(EventSource.java:600)
      	  at com.launchdarkly.eventsource.EventSource.readAnyEvent(EventSource.java:392)
      	  at com.launchdarkly.eventsource.EventSource.readMessage(EventSource.java:361)
      	  at moe.nemesiss.playground.EventSourceLeakDemo.main(EventSourceLeakDemo.java:34)
      
    • Called from closing Response.

      "main@1" prio=5 tid=0x1 nid=NA runnable
        java.lang.Thread.State: RUNNABLE
      	  at okhttp3.internal.connection.RealCall.releaseConnectionNoEvents$okhttp(RealCall.kt:381)
      	  at okhttp3.internal.connection.RealCall.callDone(RealCall.kt:350)
      	  - locked <0x8b3> (a okhttp3.internal.connection.RealConnection)
      	  at okhttp3.internal.connection.RealCall.messageDone$okhttp(RealCall.kt:309)
      	  at okhttp3.internal.connection.Exchange.bodyComplete(Exchange.kt:193)
      	  at okhttp3.internal.connection.Exchange$ResponseBodySource.complete(Exchange.kt:324)
      	  at okhttp3.internal.connection.Exchange$ResponseBodySource.close(Exchange.kt:310)
      	  at okio.RealBufferedSource.close(RealBufferedSource.kt:392)
      	  at okhttp3.internal.Util.closeQuietly(Util.kt:495)
      	  at okhttp3.ResponseBody.close(ResponseBody.kt:192)
      	  at okhttp3.Response.close(Response.kt:302)
      	  at com.launchdarkly.eventsource.HttpConnectStrategy$ResponseCloser.close(HttpConnectStrategy.java:570)
      	  at com.launchdarkly.eventsource.EventSource.closeCurrentStream(EventSource.java:694)
      	  - locked <0x923> (a java.lang.Object)
      	  at com.launchdarkly.eventsource.EventSource.close(EventSource.java:534)
      	  at moe.nemesiss.playground.EventSourceLeakDemo.main(EventSourceLeakDemo.java:42)
      
  5. But such method cannot be called either closing Call or closing connection pool. Closing Call is obvious, let's focus on closing conneciton pool.

    // com.launchdarkly.eventsource.HttpConnectStrategy.Client#close
    public void close() {
      // We need to shut down the HTTP client *if* it is one that we created, and not
      // one that the application provided to us.
      OkHttpClient preconfiguredClient = HttpConnectStrategy.this.httpClient;      
      if (preconfiguredClient == null) {
        // COVERAGE: these null guards are here for safety but in practice the values are never null and there
        // is no way to cause them to be null in unit tests
        if (httpClient.connectionPool() != null) {
          httpClient.connectionPool().evictAll(); // evict calls from connection pool.
        }
        if (httpClient.dispatcher() != null) {
          httpClient.dispatcher().cancelAll();
          if (httpClient.dispatcher().executorService() != null) {
            httpClient.dispatcher().executorService().shutdownNow();
          }
        }
      }
    }
    
    // okhttp3.internal.connection.RealConnectionPool#evictAll
    fun evictAll() {
      val i = connections.iterator()
      while (i.hasNext()) {
        val connection = i.next()
        val socketToClose = synchronized(connection) {
          if (connection.calls.isEmpty()) { // false, closing 'Call' will not detach call with connection.
            i.remove() 
            connection.noNewExchanges = true
            return@synchronized connection.socket() 
          } else {
            return@synchronized null
          }
        }
        socketToClose?.closeQuietly() // close socket (equivalent to closing 'Call').
      }
      if (connections.isEmpty()) cleanupQueue.cancelAll()
    }
    

Possible Fixup

Close underlying Response in closeCurrentStream along with closing Call at reading thread.

image

LinZong avatar Jan 07 '24 05:01 LinZong

Hello and thank you for bringing this to our attention and opening this PR. We will review when able to do so. Thank you!

tanderson-ld avatar Jan 10 '24 18:01 tanderson-ld