ksql icon indicating copy to clipboard operation
ksql copied to clipboard

Client's executeQuery("<sql>") may or may not complete, depending

Open credmond opened this issue 1 year ago • 0 comments

There's a bug in the client API whereby if you have have a stream or table, and do a simple SELECT via a executeQuery but without a LIMIT clause (or with a LIMIT greater than the maxRows default of 10k (i.e., ExecuteQueryMaxResultRows)), it may or may not complete the future.

It's random, and decreases in likelihood the higher your LIMIT is compared than maxRows.

Expected: it should complete (success or failre), regardless, with an exception at least. It makes no sense and it's impossible to handle the issue or understand the cause programmatically. All we get is a log. Your own exceptions are swallowed.

This below error is logged for most rows being consumed AFTER the max (10k) by default.

io.confluent.ksql.api.client.exception.KsqlClientException: Reached max number of rows that may be returned by executeQuery(). Increase the limit via ClientOptions#setExecuteQueryMaxResultRows(). Current limit: 10000
	at io.confluent.ksql.api.client.impl.ExecuteQueryResponseHandler.handleRow(ExecuteQueryResponseHandler.java:81)
	at io.confluent.ksql.api.client.impl.QueryResponseHandler.doHandleBodyBuffer(QueryResponseHandler.java:45)
	at io.confluent.ksql.api.client.impl.ResponseHandler.handleBodyBuffer(ResponseHandler.java:39)
	at io.confluent.ksql.api.client.impl.ExecuteQueryResponseHandler.handleBodyBuffer(ExecuteQueryResponseHandler.java:37)
	at io.vertx.core.parsetools.impl.RecordParserImpl.handleParsing(RecordParserImpl.java:214)
	at io.vertx.core.parsetools.impl.RecordParserImpl.handle(RecordParserImpl.java:285)
	at io.vertx.core.parsetools.impl.RecordParserImpl.handle(RecordParserImpl.java:27)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:277)
	at io.vertx.core.http.impl.HttpEventHandler.handleChunk(HttpEventHandler.java:51)
	at io.vertx.core.http.impl.HttpClientResponseImpl.handleChunk(HttpClientResponseImpl.java:239)
	at io.vertx.core.http.impl.Http2ClientConnection$StreamImpl.handleData(Http2ClientConnection.java:500)
	at io.vertx.core.http.impl.VertxHttp2Stream.lambda$new$1(VertxHttp2Stream.java:74)
	at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:255)
	at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:134)
	at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:76)
	at io.vertx.core.impl.ContextBase.execute(ContextBase.java:290)
	at io.vertx.core.http.impl.VertxHttp2Stream.onData(VertxHttp2Stream.java:120)
	at io.vertx.core.http.impl.Http2ConnectionBase.onDataRead(Http2ConnectionBase.java:315)
	at io.netty.handler.codec.http2.Http2FrameListenerDecorator.onDataRead(Http2FrameListenerDecorator.java:34)
	at io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onDataRead(Http2EmptyDataFrameListener.java:49)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:320)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:409)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:244)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:164)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:186)
	at io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:61)
	at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:391)
	at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:451)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.channelRead(VertxHttp2ConnectionHandler.java:415)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1503)
	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1366)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1415)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1575)

How to reproduce?

Run this a few times. Decrease LIMIT closer to 10k and observe that sometimes it does complete (i.e., thenAccept is called like it is successful), but other times will not. E.g., on my machine, a LIMIT of 12k is such an example.

import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.vertx.core.Vertx;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class But {

    private static String KSQLDB_SERVER_HOST = "ksqldb0";
    private static int KSQLDB_SERVER_HOST_PORT = 8089;

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        final Client client = ClientConfig.client();
        client.executeQuery("SELECT * FROM RIDERLOCATIONS LIMIT 20000;")
                .thenAccept(result -> {

                    // This may or not be called if limit is not set or is > 10000 (default max rows). The further away from 10000, the less likely to be called.
                    System.out.println("Result size: " + result.size());
                }).exceptionally(e -> {
                    // This will NEVER be executed
                    System.out.println("Request failed: " + e);
                    e.printStackTrace();
                    return null;
                });

        // No, waiting longer won't help.
        Thread.sleep(15000);

        // Terminate any open connections and close the client
        client.close();
    }

    public static Client client() {
        final Vertx vertx = Vertx.vertx();
        final ClientOptions options = ClientOptions.create()
                .setUseTls(false)
                .setHost(KSQLDB_SERVER_HOST)
                .setPort(KSQLDB_SERVER_HOST_PORT);
        final Client client = Client.create(options, vertx);
        return client;
    }
}

credmond avatar Jan 05 '25 00:01 credmond