ksqldb-api-client describeConnector produces wrong request
Describe the bug
The io.confluent.ksql.api.client.implClientImpl.describeConnector in ksqldb-api-client generates a request with missing quotes around the connector name.
To Reproduce Steps to reproduce the behavior, include:
- The version of KSQL: 7.5.2
- Sample source data:
ClientOptions options = ClientOptions.create()
.setHost("localhost")
.setPort("8088");
Client client = Client.create(options);
client.describeConnector("my-connector").thenAccept(result -> System.out.println(result)).get();
client.close();
- Any SQL statements you ran: No SQL statements run but creating a connector named 'my-connector'
Expected behavior
Expected behaviour is that the client.describeConnector method successfully passes a ConnectorDescription to the CompletableFuture
Actual behaviour A clear and concise description of what actually happens, including:
- CLI output
java.lang.IllegalStateException: Failed to execute ApplicationRunner
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:762) ~[spring-boot-2.7.6.jar:2.7.6]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:749) ~[spring-boot-2.7.6.jar:2.7.6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) ~[spring-boot-2.7.6.jar:2.7.6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1303) ~[spring-boot-2.7.6.jar:2.7.6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1292) ~[spring-boot-2.7.6.jar:2.7.6]
at com.alfen.kafka.ksqldb.KafkaKsqldbApplication.main(KafkaKsqldbApplication.java:31) ~[classes/:na]
Caused by: java.util.concurrent.ExecutionException: io.confluent.ksql.api.client.exception.KsqlClientException: Received 400 response from server: line 1:28: Syntax Error
Expecting ';'. Error code: 40001
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[na:na]
at com.alfen.kafka.ksqldb.KafkaKsqldbApplication.run(KafkaKsqldbApplication.java:52) ~[classes/:na]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:759) ~[spring-boot-2.7.6.jar:2.7.6]
... 5 common frames omitted
Caused by: io.confluent.ksql.api.client.exception.KsqlClientException: Received 400 response from server: line 1:28: Syntax Error
Expecting ';'. Error code: 40001
at io.confluent.ksql.api.client.impl.ClientImpl.lambda$handleErrorResponse$34(ClientImpl.java:887) ~[ksqldb-api-client-7.5.1.jar:na]
at io.vertx.core.impl.future.FutureImpl$1.onSuccess(FutureImpl.java:91) ~[vertx-core-4.3.8.jar:4.3.8]
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60) ~[vertx-core-4.3.8.jar:4.3.8]
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211) ~[vertx-core-4.3.8.jar:4.3.8]
at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23) ~[vertx-core-4.3.8.jar:4.3.8]
at io.vertx.core.http.impl.HttpEventHandler.handleEnd(HttpEventHandler.java:79) ~[vertx-core-4.3.8.jar:4.3.8]
at io.vertx.core.http.impl.HttpClientResponseImpl.handleEnd(HttpClientResponseImpl.java:250) ~[vertx-core-4.3.8.jar:4.3.8]
at io.vertx.core.http.impl.Http2ClientConnection$StreamImpl.handleEnd(Http2ClientConnection.java:461) ~[vertx-core-4.3.8.jar:4.3.8]
at io.vertx.core.http.impl.VertxHttp2Stream.lambda$new$1(VertxHttp2Stream.java:63) ~[vertx-core-4.3.8.jar:4.3.8]
at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:239) ~[vertx-core-4.3.8.jar:4.3.8]
at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:129) ~[vertx-core-4.3.8.jar:4.3.8]
at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:55) ~[vertx-core-4.3.8.jar:4.3.8]
at io.vertx.core.impl.ContextBase.emit(ContextBase.java:239) ~[vertx-core-4.3.8.jar:4.3.8]
at io.vertx.core.http.impl.VertxHttp2Stream.onEnd(VertxHttp2Stream.java:135) ~[vertx-core-4.3.8.jar:4.3.8]
at io.vertx.core.http.impl.Http2ClientConnection$Stream.onEnd(Http2ClientConnection.java:288) ~[vertx-core-4.3.8.jar:4.3.8]
at io.vertx.core.http.impl.VertxHttp2Stream.onEnd(VertxHttp2Stream.java:130) ~[vertx-core-4.3.8.jar:4.3.8]
at io.vertx.core.http.impl.Http2ConnectionBase.onDataRead(Http2ConnectionBase.java:315) ~[vertx-core-4.3.8.jar:4.3.8]
at io.netty.handler.codec.http2.Http2FrameListenerDecorator.onDataRead(Http2FrameListenerDecorator.java:36) ~[netty-codec-http2-4.1.85.Final.jar:4.1.85.Final]
at io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onDataRead(Http2EmptyDataFrameListener.java:49) ~[netty-codec-http2-4.1.85.Final.jar:4.1.85.Final]
at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:307) ~[netty-codec-http2-4.1.85.Final.jar:4.1.85.Final]
at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:415) ~[netty-codec-http2-4.1.85.Final.jar:4.1.85.Final]
at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:250) ~[netty-codec-http2-4.1.85.Final.jar:4.1.85.Final]
at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159) ~[netty-codec-http2-4.1.85.Final.jar:4.1.85.Final]
at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173) ~[netty-codec-http2-4.1.85.Final.jar:4.1.85.Final]
at io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:63) ~[netty-codec-http2-4.1.85.Final.jar:4.1.85.Final]
at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:393) ~[netty-codec-http2-4.1.85.Final.jar:4.1.85.Final]
at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:453) ~[netty-codec-http2-4.1.85.Final.jar:4.1.85.Final]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1.85.Final.jar:4.1.85.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.85.Final.jar:4.1.85.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.85.Final.jar:4.1.85.Final]
at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.channelRead(VertxHttp2ConnectionHandler.java:408) ~[vertx-core-4.3.8.jar:4.3.8]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.85.Final.jar:4.1.85.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.85.Final.jar:4.1.85.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.85.Final.jar:4.1.85.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.85.Final.jar:4.1.85.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.85.Final.jar:4.1.85.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.85.Final.jar:4.1.85.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.85.Final.jar:4.1.85.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.85.Final.jar:4.1.85.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.85.Final.jar:4.1.85.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.85.Final.jar:4.1.85.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.85.Final.jar:4.1.85.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.85.Final.jar:4.1.85.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.85.Final.jar:4.1.85.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.85.Final.jar:4.1.85.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.85.Final.jar:4.1.85.Final]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
- Error messages
Received 400 response from server: line 1:28: Syntax Error
Expecting ';'. Error code: 40001
- KSQL logs
ksqldb-server | [2023-12-05 12:56:13,655] INFO Received: KsqlRequest{configOverrides={}, requestProperties={}, commandSequenceNumber=Optional.empty} (6f74d9e1-f683-3b80-b8c4-ef42eb6a00e6): <unparsable query> (io.confluent.ksql.logging.query.QueryLogger)
ksqldb-server | [2023-12-05 12:56:13,656] INFO Processed unsuccessfully: KsqlRequest{configOverrides={}, requestProperties={}, commandSequenceNumber=Optional.empty} (6f74d9e1-f683-3b80-b8c4-ef42eb6a00e6): <unparsable query> (io.confluent.ksql.logging.query.QueryLogger)
ksqldb-server | io.confluent.ksql.parser.exception.ParseFailedException: line 1:28: Syntax error at line 1:28
ksqldb-server | at io.confluent.ksql.parser.DefaultKsqlParser.parse(DefaultKsqlParser.java:56)
ksqldb-server | at io.confluent.ksql.engine.EngineContext.parse(EngineContext.java:160)
ksqldb-server | at io.confluent.ksql.engine.KsqlEngine.parse(KsqlEngine.java:286)
ksqldb-server | at io.confluent.ksql.rest.server.resources.KsqlResource.handleKsqlStatements(KsqlResource.java:308)
ksqldb-server | at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeKsqlRequest$2(KsqlServerEndpoints.java:183)
ksqldb-server | at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOldApiEndpointOnWorker$24(KsqlServerEndpoints.java:348)
ksqldb-server | at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOnWorker$23(KsqlServerEndpoints.java:334)
ksqldb-server | at io.vertx.core.impl.ContextBase.lambda$null$0(ContextBase.java:137)
ksqldb-server | at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
ksqldb-server | at io.vertx.core.impl.ContextBase.lambda$executeBlocking$1(ContextBase.java:135)
ksqldb-server | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
ksqldb-server | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
ksqldb-server | at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
ksqldb-server | at java.base/java.lang.Thread.run(Thread.java:829)
ksqldb-server | Caused by: io.confluent.ksql.parser.ParsingException: line 1:28: Syntax error at line 1:28
ksqldb-server | at io.confluent.ksql.parser.SyntaxErrorValidator.syntaxError(SyntaxErrorValidator.java:162)
ksqldb-server | at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41)
ksqldb-server | at org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:544)
ksqldb-server | at org.antlr.v4.runtime.DefaultErrorStrategy.reportInputMismatch(DefaultErrorStrategy.java:327)
ksqldb-server | at org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:139)
ksqldb-server | at io.confluent.ksql.parser.SqlBaseParser.singleStatement(SqlBaseParser.java:384)
ksqldb-server | at io.confluent.ksql.parser.SqlBaseParser.statements(SqlBaseParser.java:240)
ksqldb-server | at io.confluent.ksql.parser.DefaultKsqlParser.getParseTree(DefaultKsqlParser.java:121)
ksqldb-server | at io.confluent.ksql.parser.DefaultKsqlParser.parse(DefaultKsqlParser.java:45)
ksqldb-server | ... 13 more
ksqldb-server | [2023-12-05 12:56:13,657] WARN 10.89.0.19 - - [Tue, 5 Dec 2023 12:56:13 GMT] "POST /ksql HTTP/2.0" 400 181 "-" "ksqlDB Java Client v7.5.1" 84 (io.confluent.ksql.api.server.LoggingHandler)
Additional context
Running the query describe connector my-connector; in the ksqldb-cli gives the same error.
The correct query would be describe connector "my-connector";
This is directly visible in the source code of the ClientImpl class line 470-485:
@Override
public CompletableFuture<ConnectorDescription> describeConnector(final String name) {
final CompletableFuture<ConnectorDescription> cf = new CompletableFuture<>();
makePostRequest(
KSQL_ENDPOINT,
new JsonObject()
.put("ksql", "describe connector " + name + ";")
.put("sessionVariables", sessionVariables),
cf,
response -> handleSingleEntityResponse(
response, cf, ConnectorCommandResponseHandler::handleDescribeConnectorsResponse)
);
return cf;
}
It is still present in master at the moment of posting. I'll submit a PR shortly to fix this.
This still isn't fixed in 7.8.0. Or else it has returned.
Same for dropConnector. Embarrassing.