ksql
ksql copied to clipboard
INSERT INTO statement fails on stream with defined VALUE_SCHEMA_FULL_NAME and VALUE_SCHEMA_ID for Avro schema w/ struct
Describe the bug
When attempting INSERT INTO <stream> values();
statements to a stream with a defined VALUE_SCHEMA_FULL_NAME and VALUE_SCHEMA_ID referencing an AVRO schema with a struct data type fail with the error Could not serialize value. Error serializing message to topic: <topic-name>. Struct schemas do not match.
To Reproduce KSQL Version: 0.26.0 Example:
- Created topic "tmp-simple" with associated Avro schema:
{
"fields": [
{
"name": "field1",
"type": "string"
},
{
"default": null,
"name": "field2",
"type": [
"null",
{
"fields": [
{
"default": null,
"name": "foo",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "bar",
"type": [
"null",
"int"
]
}
],
"name": "tmpField2",
"type": "record"
}
]
}
],
"name": "tmpSimple",
"namespace": "io.confluent.test",
"type": "record"
}
- Created stream using topic, and specify the VALUE_SCHEMA_ID:
create stream tmp_simple with(kafka_topic='tmp_simple', key_format='kafka', value_format='avro', value_schema_id=100120, partitions=1);
- Attempt to INSERT value into the stream, returning the error:
INSERT INTO tmp_simple (`field1`,`field2`) VALUES('s1', struct(`foo`:='bar',`bar`:=2));
Failed to insert values into 'TMP_SIMPLE'. Could not serialize value: [ 's1' | Struct{foo=bar,bar=2} ]. Error serializing message to topic: tmp-simple. Struct schemas do not match.
Additional context If a stream is made without specifying VALUE_SCHEMA_ID, such that ksql create the topic and schema for example, there are no issues with INSERT statements executing:
CREATE STREAM TMP_SIMPLE_AUTO (`field1` STRING, `field2` STRUCT<`foo` STRING, `bar` INTEGER>) WITH (KAFKA_TOPIC='tmp-simple-auto', KEY_FORMAT='kafka', PARTITIONS=1, VALUE_FORMAT='avro');
INSERT INTO tmp_simple_auto(`field1`,`field2`) VALEUS('s1', struct(`foo`:='bar',`bar`:=2));
Value returned in query:
{
"field1": "s1",
"field2": {
"foo": "bar",
"bar": 2
}
}
Schema created by ksql:
{
"fields": [
{
"default": null,
"name": "field1",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "field2",
"type": [
"null",
{
"fields": [
{
"default": null,
"name": "foo",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "bar",
"type": [
"null",
"int"
]
}
],
"name": "KsqlDataSourceSchema_field2",
"type": "record"
}
]
}
],
"name": "KsqlDataSourceSchema",
"namespace": "io.confluent.ksql.avro_schemas",
"type": "record"
}
@spena , can you test this out ?
Hi,
I'm facing the same exact problem and I believe I know why. In my Avro schema, I have fields that are optional (it can be a struct, or it can be null, etc). I found this in the KSQL_PROCESSING_LOG with the following query:
SELECT MESSAGE->SERIALIZATIONERROR
FROM KSQL_PROCESSING_LOG
WHERE LEVEL = 'ERROR'
AND MESSAGE->TYPE = 3;
However, when I use EXPLAIN, I see that the logical ksqlDB schemas defined in the SOURCE and SINK are identical. This is the only place where I could identify where struct schemas did not match.
So, is ksqlDB capable of optional values or accepting null?
There's a bug when using the VALUE_SCHEMA_ID and INSERT with Avro schemas. KSQL takes a different code path in Ksql when serializing the INSERT record when a SCHEMA_ID is found in the stream.
See, during a normal CREATE (no schema id), the INSERT would create a schema with the connect names that match the SR schema. See https://github.com/confluentinc/ksql/blob/master/ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/AvroDataTranslator.java#L77
But with SCHEMA_ID, the INSERT won't make the schema compatible. It will create an INSERT schema with empty connect names which make it incompatible when serializing the value. See https://github.com/confluentinc/ksql/blob/master/ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSRSchemaDataTranslator.java#L34
Sadly there's no workaround. I tried different alternatives to bypass this buggy code but I couldn't find it. We'll fix on a bugfix for this.
Yeah. If we insert without schema id, new schema will be created and registered again so that insertion will always succeed. But if schema id is provided, no new schema will be registered and we need to match the exact schema from schema registry.
This is also related to https://github.com/confluentinc/ksql/issues/6091 and https://github.com/confluentinc/ksql/issues/7211. Basically connect.name
and possibly other properties in physical schema might be mishandled during logical to physical schema translation time.
Hi, Is the fix for this issue in any releases?
Can you please update if this fix is available in any release?
@AndreasFischbach3003 @nicollette Yes it will be available in our next release. It comes in January.
hi, is this fix already available in the confluent cloud. can you let me know which version of ksql got this fix. thank you