ksql
ksql copied to clipboard
Decimal not working with schema registry
Describe the bug Decimal field with schema registry doesn't work seems.
To Reproduce Steps to reproduce the behavior, include: Query:
CREATE STREAM INPUT123 (id decimal(16, 2) key, bar INT) WITH (kafka_topic='input_topic123', format='AVRO', partitions = 1);
insert into INPUT123 values (12.3, 10);
select * from INPUT123;
Expected behavior
Select rows from INPUT123
Actual behaviour Nothing returns and there's exception:
Caused by: org.apache.kafka.connect.errors.DataException: Mismatched versions: version already added to SchemaBuilder (1) differs from version in source schema (2)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1906)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1645)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1326)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:116)
at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:49)
... 18 more
Additional context Add any other context about the problem here.
This is also happening with TIMESTAMP and STRUCT. INSERT VALUES changes the key schema - I had a similar example with a TIMESTAMP key, and before the INSERT the key schema was
{
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"logicalType": "timestamp-millis",
"type": "long"
}
After, it turned into
{
"logicalType": "timestamp-millis",
"type": "long"
}
I also tested with Protobuf and JSON_SR, and neither of those formats are affected.
We have QTT tests for Avro decimal keys, with and without schema inference: https://github.com/confluentinc/ksql/blob/99c09f4ac6f6326c27143e6ae970f524a330b785/ksqldb-functional-tests/src/test/resources/query-validation-tests/avro.json#L820-L912
It'd be good to hone in on the scope of the bug more specifically. Maybe the bug is specific to INSERT VALUES?
Related to https://github.com/confluentinc/ksql/issues/6091