deephaven-core icon indicating copy to clipboard operation
deephaven-core copied to clipboard

java-client seems to be limited to about 128 streams

Open niloc132 opened this issue 1 year ago • 1 comments

From the java barrage client, create more than about 128 streams at once - here's a quick example that will subscribe to the same existing ticket 200 times.

diff --git a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java
index ca802d3411..62b23ba2cd 100644
--- a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java
+++ b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java
@@ -20,6 +20,8 @@ import io.deephaven.util.SafeCloseable;
 import io.deephaven.util.annotations.ReferentialIntegrity;
 import picocli.CommandLine;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
 abstract class SubscribeExampleBase extends BarrageClientExampleBase {
@@ -56,54 +58,20 @@ abstract class SubscribeExampleBase extends BarrageClientExampleBase {
 
         try (final SafeCloseable ignored = LivenessScopeStack.open();
                 final TableHandle handle = subscriptionManager.executeLogic(logic())) {
-            final BarrageSubscription subscription = client.subscribe(handle, options);
-
-            final Table subscriptionTable;
-            if (headerSize > 0) {
-                // create a Table subscription with forward viewport of the specified size
-                subscriptionTable = subscription.partialTable(RowSetFactory.flat(headerSize), null, false).get();
-            } else if (tailSize > 0) {
-                // create a Table subscription with reverse viewport of the specified size
-                subscriptionTable = subscription.partialTable(RowSetFactory.flat(tailSize), null, true).get();
-            } else {
-                // create a Table subscription of the entire Table
-                subscriptionTable = subscription.entireTable().get();
+            BarrageSubscription subscription = null;
+            List<BarrageSubscription> subs = new ArrayList<>();
+            for (int i = 0; i < 200; i++) {
+                System.out.println("subscribe " + i);
+                subscription = client.subscribe(handle, options);
+                subs.add(subscription);
             }
 
-            System.out.println("Subscription established");
-            System.out.println("Table info: rows = " + subscriptionTable.size() + ", cols = " +
-                    subscriptionTable.numColumns());
-            TableTools.show(subscriptionTable);
-            System.out.println();
-
-            subscriptionTable.addUpdateListener(listener = new InstrumentedTableUpdateListener("example-listener") {
-                @ReferentialIntegrity
-                final Table tableRef = subscriptionTable;
-                {
-                    // Maintain a liveness ownership relationship with subscriptionTable for the lifetime of the
-                    // listener
-                    manage(tableRef);
-                }
-
-                @Override
-                protected void destroy() {
-                    super.destroy();
-                    tableRef.removeUpdateListener(this);
-                }
-
-                @Override
-                protected void onFailureInternal(final Throwable originalException, final Entry sourceEntry) {
-                    System.out.println("exiting due to onFailureInternal:");
-                    originalException.printStackTrace();
-                    countDownLatch.countDown();
-                }
-
-                @Override
-                public void onUpdate(final TableUpdate upstream) {
-                    System.out.println("Received table update:");
-                    System.out.println(upstream);
-                }
-            });
+            // make sure we use it at least once, can't be cleaned up yet
+            for (int i = 0; i < subs.size(); i++) {
+                System.out.println("entireTable " + i);
+                BarrageSubscription sub = subs.get(i);
+                sub.entireTable().get();
+            }
 
             countDownLatch.await();
 

Logging will stop around 128 (meaning 129 connections), and no more progress will be made - the client code will block until calls start failing on the server. The first loop will run quickly - creating grpc Call instances, but the second loop which actually tries to send on those will block.

Watching with wireshark, I see the server sends (in its SETTINGS frame) 0 for max concurrent streams, then the client sends 128 for the same value. Note that Jetty's 0 may also be out of spec, it seems to say that the server wont allow any streams at all, but it is implied in context that the server will allow unlimited streams? Worth looking closer - see https://httpwg.org/specs/rfc7540.html#SETTINGS_MAX_CONCURRENT_STREAMS for more documentation on this feature.

So far the only place I can see that this flag is set in the client is in io.netty.handler.codec.http2.Http2CodecUtil:

    /**
     * The assumed minimum value for {@code SETTINGS_MAX_CONCURRENT_STREAMS} as
     * recommended by the <a herf="https://tools.ietf.org/html/rfc7540#section-6.5.2">HTTP/2 spec</a>.
     */
    public static final int SMALLEST_MAX_CONCURRENT_STREAMS = 100;
    static final int DEFAULT_MAX_RESERVED_STREAMS = SMALLEST_MAX_CONCURRENT_STREAMS;

That value is a little lower than I seem to be observing, and I can't find the actual observed constant anywhere. That default value propagates to io.netty.handler.codec.http2.DefaultHttp2Connection.DefaultEndpoint.maxReservedStreams via the default constructor for both grpc netty server and client (NettyServerHandler, NettyClientHandler respectively), and that field is final. There is a DefaultEndpoint.maxActiveStreams() that allows a related field to be written, but I can't yet trace a path that we can control to this.

From the NettyServerBuilder, I do see a maxConcurrentCallsPerConnection field, which (though a few renames) does eventually control the corresponding property for the server (via the same classes listed above), but there is no NettyClientBuilder - just NettyChannelBuilder, and I don't see such a property there.

niloc132 avatar Aug 14 '24 02:08 niloc132

Looks like https://github.com/grpc/grpc-java/issues/8252 may be of interest here.

niloc132 avatar Aug 14 '24 02:08 niloc132

https://grpc.io/docs/guides/performance/

(Special topic) Each gRPC channel uses 0 or more HTTP/2 connections and each connection usually has a limit on the number of concurrent streams. When the number of active RPCs on the connection reaches this limit, additional RPCs are queued in the client and must wait for active RPCs to finish before they are sent. Applications with high load or long-lived streaming RPCs might see performance issues because of this queueing. There are two possible solutions:

Create a separate channel for each area of high load in the application.

Use a pool of gRPC channels to distribute RPCs over multiple connections (channels must have different channel args to prevent re-use so define a use-specific channel arg such as channel number).

Side note: The gRPC team has plans to add a feature to fix these performance issues (see grpc/grpc#21386 for more info), so any solution involving creating multiple channels is a temporary workaround that should eventually not be needed.

devinrsmith avatar Aug 30 '24 00:08 devinrsmith

org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory#setMaxConcurrentStreams seems relevant.

devinrsmith avatar Aug 30 '24 00:08 devinrsmith

Confirmed that I misread the wireshark output the first time - the client is the one reporting zero streams, but the server is reporting 128. Submitted patch will let clients change that value - tested with 1 million streams, the SETTINGS frame reports the value correctly, and the client can open many more than 128 streams.

Likewise confirmed for #6021 that the server previously would send that the max header size is 8k, and this can now be set to an arbitrary value via configuration options.

niloc132 avatar Sep 10 '24 14:09 niloc132