ksql icon indicating copy to clipboard operation
ksql copied to clipboard

INSERT INTO statement fails on stream with defined VALUE_SCHEMA_FULL_NAME and VALUE_SCHEMA_ID for Avro schema w/ struct

Open randyx123 opened this issue 2 years ago • 4 comments

Describe the bug When attempting INSERT INTO <stream> values(); statements to a stream with a defined VALUE_SCHEMA_FULL_NAME and VALUE_SCHEMA_ID referencing an AVRO schema with a struct data type fail with the error Could not serialize value. Error serializing message to topic: <topic-name>. Struct schemas do not match.

To Reproduce KSQL Version: 0.26.0 Example:

  1. Created topic "tmp-simple" with associated Avro schema:
{
  "fields": [
    {
      "name": "field1",
      "type": "string"
    },
    {
      "default": null,
      "name": "field2",
      "type": [
        "null",
        {
          "fields": [
            {
              "default": null,
              "name": "foo",
              "type": [
                "null",
                "string"
              ]
            },
            {
              "default": null,
              "name": "bar",
              "type": [
                "null",
                "int"
              ]
            }
          ],
          "name": "tmpField2",
          "type": "record"
        }
      ]
    }
  ],
  "name": "tmpSimple",
  "namespace": "io.confluent.test",
  "type": "record"
}
  1. Created stream using topic, and specify the VALUE_SCHEMA_ID: create stream tmp_simple with(kafka_topic='tmp_simple', key_format='kafka', value_format='avro', value_schema_id=100120, partitions=1);
  2. Attempt to INSERT value into the stream, returning the error:
INSERT INTO tmp_simple (`field1`,`field2`) VALUES('s1', struct(`foo`:='bar',`bar`:=2));
Failed to insert values into 'TMP_SIMPLE'. Could not serialize value: [ 's1' | Struct{foo=bar,bar=2} ]. Error serializing message to topic: tmp-simple. Struct schemas do not match.

Additional context If a stream is made without specifying VALUE_SCHEMA_ID, such that ksql create the topic and schema for example, there are no issues with INSERT statements executing:

CREATE STREAM TMP_SIMPLE_AUTO (`field1` STRING, `field2` STRUCT<`foo` STRING, `bar` INTEGER>) WITH (KAFKA_TOPIC='tmp-simple-auto', KEY_FORMAT='kafka', PARTITIONS=1, VALUE_FORMAT='avro');
INSERT INTO  tmp_simple_auto(`field1`,`field2`) VALEUS('s1', struct(`foo`:='bar',`bar`:=2));

Value returned in query:

{
  "field1": "s1",
  "field2": {
    "foo": "bar",
    "bar": 2
  }
}

Schema created by ksql:

{
  "fields": [
    {
      "default": null,
      "name": "field1",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "field2",
      "type": [
        "null",
        {
          "fields": [
            {
              "default": null,
              "name": "foo",
              "type": [
                "null",
                "string"
              ]
            },
            {
              "default": null,
              "name": "bar",
              "type": [
                "null",
                "int"
              ]
            }
          ],
          "name": "KsqlDataSourceSchema_field2",
          "type": "record"
        }
      ]
    }
  ],
  "name": "KsqlDataSourceSchema",
  "namespace": "io.confluent.ksql.avro_schemas",
  "type": "record"
}

randyx123 avatar Jun 07 '22 17:06 randyx123

@spena , can you test this out ?

suhas-satish avatar Jun 14 '22 16:06 suhas-satish

Hi,

I'm facing the same exact problem and I believe I know why. In my Avro schema, I have fields that are optional (it can be a struct, or it can be null, etc). I found this in the KSQL_PROCESSING_LOG with the following query:

SELECT MESSAGE->SERIALIZATIONERROR
FROM KSQL_PROCESSING_LOG 
WHERE LEVEL = 'ERROR'
	AND MESSAGE->TYPE = 3; 

However, when I use EXPLAIN, I see that the logical ksqlDB schemas defined in the SOURCE and SINK are identical. This is the only place where I could identify where struct schemas did not match.

So, is ksqlDB capable of optional values or accepting null?

kurtis-ley avatar Jun 14 '22 19:06 kurtis-ley

There's a bug when using the VALUE_SCHEMA_ID and INSERT with Avro schemas. KSQL takes a different code path in Ksql when serializing the INSERT record when a SCHEMA_ID is found in the stream.

See, during a normal CREATE (no schema id), the INSERT would create a schema with the connect names that match the SR schema. See https://github.com/confluentinc/ksql/blob/master/ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/AvroDataTranslator.java#L77

But with SCHEMA_ID, the INSERT won't make the schema compatible. It will create an INSERT schema with empty connect names which make it incompatible when serializing the value. See https://github.com/confluentinc/ksql/blob/master/ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSRSchemaDataTranslator.java#L34

Sadly there's no workaround. I tried different alternatives to bypass this buggy code but I couldn't find it. We'll fix on a bugfix for this.

spena avatar Jun 16 '22 22:06 spena

Yeah. If we insert without schema id, new schema will be created and registered again so that insertion will always succeed. But if schema id is provided, no new schema will be registered and we need to match the exact schema from schema registry.

This is also related to https://github.com/confluentinc/ksql/issues/6091 and https://github.com/confluentinc/ksql/issues/7211. Basically connect.name and possibly other properties in physical schema might be mishandled during logical to physical schema translation time.

lihaosky avatar Jun 28 '22 17:06 lihaosky

Hi, Is the fix for this issue in any releases?

nicollette avatar Oct 27 '22 20:10 nicollette

Can you please update if this fix is available in any release?

AndreasFischbach3003 avatar Dec 07 '22 10:12 AndreasFischbach3003

@AndreasFischbach3003 @nicollette Yes it will be available in our next release. It comes in January.

aliehsaeedii avatar Dec 08 '22 11:12 aliehsaeedii

hi, is this fix already available in the confluent cloud. can you let me know which version of ksql got this fix. thank you

sgudavalli avatar Feb 03 '23 11:02 sgudavalli