ksql icon indicating copy to clipboard operation
ksql copied to clipboard

Class-type config overrides cannot be serialized to the command topic

Open vcrfxia opened this issue 2 years ago • 0 comments

Describe the bug

Sending a ksql request with a class-type config override (at the query-level) results in commands being unable to be serialized to the command topic:

Did not write the command to the command topic as it could not be deserialized. This is a bug! Please raise a Github issue containing the series of commands you ran to get to this point.
Failed to coerce type of value 'class org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp' for key 'default.timestamp.extractor' (through reference chain: io.confluent.ksql.rest.server.computation.Command["streamsProperties"])

To Reproduce

On a recent version of ksql (confirmed the bug on master, initial report was for 0.26), issue a CREATE STREAM AS (or any other command that needs to be serialized to the command topic) with a class-type query-level config override.

For example:

ksql> set 'default.timestamp.extractor'='org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp';
Successfully changed local property 'default.timestamp.extractor' to 'org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp'. Use the UNSET command to revert your change.
ksql> create stream log_mini as select level, logger from ksql_processing_log;
Did not write the command to the command topic as it could not be deserialized. This is a bug! Please raise a Github issue containing the series of commands you ran to get to this point.
Failed to coerce type of value 'class org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp' for key 'default.timestamp.extractor' (through reference chain: io.confluent.ksql.rest.server.computation.Command["streamsProperties"])

Expected behavior

Command is successfully written to the command topic.

Actual behaviour

CLI error above. Server log is more or less the same:

[2022-08-08 11:28:17,643] INFO Processed unsuccessfully: KsqlRequest{ksql='create stream log_mini as select level, logger from ksql_processing_log;', configOverrides={default.timestamp.extractor=org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp}, requestProperties={}, sessionVariables={}, commandSequenceNumber=Optional[-1]}, reason:  (io.confluent.ksql.rest.server.resources.KsqlResource:347)
io.confluent.ksql.util.KsqlServerException: Did not write the command to the command topic as it could not be deserialized. This is a bug! Please raise a Github issue containing the series of commands you ran to get to this point.
Failed to coerce type of value 'class org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp' for key 'default.timestamp.extractor' (through reference chain: io.confluent.ksql.rest.server.computation.Command["streamsProperties"])
	at io.confluent.ksql.rest.server.computation.ValidatedCommandFactory.ensureDeserializable(ValidatedCommandFactory.java:110)
	at io.confluent.ksql.rest.server.computation.ValidatedCommandFactory.create(ValidatedCommandFactory.java:87)
	at io.confluent.ksql.rest.server.validation.RequestValidator.validate(RequestValidator.java:176)
	at io.confluent.ksql.rest.server.validation.RequestValidator.validate(RequestValidator.java:130)
	at io.confluent.ksql.rest.server.resources.KsqlResource.handleKsqlStatements(KsqlResource.java:297)
	at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeKsqlRequest$2(KsqlServerEndpoints.java:183)
	at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOldApiEndpointOnWorker$23(KsqlServerEndpoints.java:341)
	at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOnWorker$22(KsqlServerEndpoints.java:327)
	at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:313)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

Additional context

Most of the class-type configs we have are meant to be set at the server level, not the query level. This is true for all five ksql-specific class-type configs today: ksql.security.extension.class, ksql.connect.request.headers.plugin, ksql.metrics.extension, ksql.server.error.messages, ksql.authentication.plugin.class. It's plausible that a user might want to set other class-type configs (at the streams level, or custom configs) at the query-level, which would trigger this bug. The specific example of the streams config default.timestamp.extractor above is a no-op today, though, because of https://github.com/confluentinc/ksql/issues/9093.

That said, this is a regression (initial reporter @SamiShaikh confirmed that this used to work on ksql 0.24), so it'd be good to fix.

I didn't look too deeply at what causes the bug, but it's definitely an issue with how ksql tries to serialize commands before writing them to the command topic. A raw string such as org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp parses fine but the string is getting transformed into class org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp (with the additional class prefix in front) which is unable to deserialize. The bug is not CLI-specific.

vcrfxia avatar Aug 08 '22 15:08 vcrfxia