ksql
ksql copied to clipboard
Class-type config overrides cannot be serialized to the command topic
Describe the bug
Sending a ksql request with a class-type config override (at the query-level) results in commands being unable to be serialized to the command topic:
Did not write the command to the command topic as it could not be deserialized. This is a bug! Please raise a Github issue containing the series of commands you ran to get to this point.
Failed to coerce type of value 'class org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp' for key 'default.timestamp.extractor' (through reference chain: io.confluent.ksql.rest.server.computation.Command["streamsProperties"])
To Reproduce
On a recent version of ksql (confirmed the bug on master, initial report was for 0.26), issue a CREATE STREAM AS
(or any other command that needs to be serialized to the command topic) with a class-type query-level config override.
For example:
ksql> set 'default.timestamp.extractor'='org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp';
Successfully changed local property 'default.timestamp.extractor' to 'org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp'. Use the UNSET command to revert your change.
ksql> create stream log_mini as select level, logger from ksql_processing_log;
Did not write the command to the command topic as it could not be deserialized. This is a bug! Please raise a Github issue containing the series of commands you ran to get to this point.
Failed to coerce type of value 'class org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp' for key 'default.timestamp.extractor' (through reference chain: io.confluent.ksql.rest.server.computation.Command["streamsProperties"])
Expected behavior
Command is successfully written to the command topic.
Actual behaviour
CLI error above. Server log is more or less the same:
[2022-08-08 11:28:17,643] INFO Processed unsuccessfully: KsqlRequest{ksql='create stream log_mini as select level, logger from ksql_processing_log;', configOverrides={default.timestamp.extractor=org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp}, requestProperties={}, sessionVariables={}, commandSequenceNumber=Optional[-1]}, reason: (io.confluent.ksql.rest.server.resources.KsqlResource:347)
io.confluent.ksql.util.KsqlServerException: Did not write the command to the command topic as it could not be deserialized. This is a bug! Please raise a Github issue containing the series of commands you ran to get to this point.
Failed to coerce type of value 'class org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp' for key 'default.timestamp.extractor' (through reference chain: io.confluent.ksql.rest.server.computation.Command["streamsProperties"])
at io.confluent.ksql.rest.server.computation.ValidatedCommandFactory.ensureDeserializable(ValidatedCommandFactory.java:110)
at io.confluent.ksql.rest.server.computation.ValidatedCommandFactory.create(ValidatedCommandFactory.java:87)
at io.confluent.ksql.rest.server.validation.RequestValidator.validate(RequestValidator.java:176)
at io.confluent.ksql.rest.server.validation.RequestValidator.validate(RequestValidator.java:130)
at io.confluent.ksql.rest.server.resources.KsqlResource.handleKsqlStatements(KsqlResource.java:297)
at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeKsqlRequest$2(KsqlServerEndpoints.java:183)
at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOldApiEndpointOnWorker$23(KsqlServerEndpoints.java:341)
at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOnWorker$22(KsqlServerEndpoints.java:327)
at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:313)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Additional context
Most of the class-type configs we have are meant to be set at the server level, not the query level. This is true for all five ksql-specific class-type configs today: ksql.security.extension.class
, ksql.connect.request.headers.plugin
, ksql.metrics.extension
, ksql.server.error.messages
, ksql.authentication.plugin.class
. It's plausible that a user might want to set other class-type configs (at the streams level, or custom configs) at the query-level, which would trigger this bug. The specific example of the streams config default.timestamp.extractor
above is a no-op today, though, because of https://github.com/confluentinc/ksql/issues/9093.
That said, this is a regression (initial reporter @SamiShaikh confirmed that this used to work on ksql 0.24), so it'd be good to fix.
I didn't look too deeply at what causes the bug, but it's definitely an issue with how ksql tries to serialize commands before writing them to the command topic. A raw string such as org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp
parses fine but the string is getting transformed into class org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp
(with the additional class
prefix in front) which is unable to deserialize. The bug is not CLI-specific.