ksql
ksql copied to clipboard
JSON_SR enum gives a struct projection instead of string
Describe the bug JSON_SR enum gives a struct projection instead of string
To Reproduce Create topic and register json schema
{
"$id": "http://example.com/myURI.schema.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": false,
"description": "Sample schema to help you get started.",
"properties": {
"myField1": {
"description": "The integer type is used for integral numbers.",
"type": "integer"
},
"myField3": {
"description": "The string type is used for strings of text.",
"type": "string"
},
"type": {
"enum": [
"DEPOSIT",
"WITHDRAW"
],
"title": "TransactionType",
"type": [
"string"
]
}
},
"title": "SampleRecord",
"type": "object"
}
Create stream
create stream acct3str with(kafka_topic='acctbal3',value_format='json_sr');
Expected behavior enum should be projected as string
Actual behaviour
enum is projected as struct
TYPE STRUCT<
IO.CONFLUENT.CONNECT.JSON.ONEOF.FIELD.0 STRING>
Hence the following produced record shall fail deserialisation
./bin/kafka-json-schema-console-producer --bootstrap-server localhost:9092 --topic acctbal3 --property schema.registry.url=http://localhost:8081 --property value.schema.id=23
{"myField1":1, "type":"DEPOSIT"}
select * from acct3str emit changes;
Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: TextNode, requiredType: STRUCT<IO.CONFLUENT.CONNECT.JSON.ONEOF.FIELD.0 VARCHAR>, path: $.TYPE
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:169)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:128)
... 17 more
Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: TextNode, requiredType: STRUCT<IO.CONFLUENT.CONNECT.JSON.ONEOF.FIELD.0 VARCHAR>, path: .TYPE
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:171)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldTypesForStruct(KsqlJsonDeserializer.java:293)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:167)
... 18 more
Additional context This works fine for avro
{
"name": "type",
"type": {
"name": "TransactionType",
"symbols": [
"DEPOSIT",
"WITHDRAW"
],
"type": "enum"
}
}
This is possibly related to https://github.com/confluentinc/ksql/issues/8526
workaround is to use the desired projection in CS statement
ksql> create stream acct4str(MYFIELD3 STRING, TYPE STRING, MYFIELD1 BIGINT) with(kafka_topic='acctbal3',value_format='json_sr');
Message
----------------
Stream created
----------------
ksql> select * from acct4str emit changes;
+--------------------------------------------------------------------+--------------------------------------------------------------------+--------------------------------------------------------------------+
|MYFIELD3 |TYPE |MYFIELD1 |
+--------------------------------------------------------------------+--------------------------------------------------------------------+--------------------------------------------------------------------+
|null |DEPOSIT |1 |
I am also seeing this problem
Does not seem related to https://github.com/confluentinc/ksql/issues/8526 as I am using latest version