ksql
ksql copied to clipboard
KSQL table is not able to support in-place schema update for Debezium's Schema evolution when using Avro payloads supported with schema registry
Describe the bug KSQL table is not able to support in-place schema update for Debezium's Schema evolution. It neither performs auto-magic update on schema while reading data with schema version v2, nor CREATE OR REPLACE statement adopts v2 in schema registry
To Reproduce Steps to reproduce the behavior, include: KSQL Version : 0.20, Confluent on Kubernetes : CP 6.2, Debezium v1.5 GA Sequence to reproduce the issue :
- Have a MySQL table CDC topic from Debezium (this will have schema v1)
- Let some data pump in
- Run CREATE TABLE KSQL query
CREATE TABLE TBL_NAME WITH (KAFKA_TOPIC='CDC_TOPIC_NAME', VALUE_FORMAT='AVRO', KEY_FORMAT='AVRO');
- Alter MySQL table and add one more column of type String at the end
- Observe that Debezium History topic has the DDL change event
- Observe that actual CDC topic has schema v2 when the first new record is created on that table
- Now, Apply CREATE OR REPLACE statement
CREATE OR REPLACE TABLE TBL_NAME WITH (KAFKA_TOPIC='CDC_TOPIC_NAME', VALUE_FORMAT='AVRO', KEY_FORMAT='AVRO');
Expected behavior KSQL table should have evolved schema version v2 auto-magically or at the least support in-place DDL upgrade with CREATE OR REPLACE
Actual behaviour
This is the error : Cannot upgrade data source: DataSource 'CUSTOMERS_CDC_TBL
'
(The following columns are changed, missing or reordered: [__OP
STRING, __TABLE
STRING, __SOURCE_TS_MS
BIGINT, __TS_MS
BIGINT, __DELETED
STRING])
Sample KSQL table schema created with v1 topic schema as below:
ksql> DESCRIBE CUSTOMERS_CDC_TBL;
Name : CUSTOMERS_CDC_TBL
Field | Type
-------------------------------------------------------------
ROWKEY | STRUCT<ID INTEGER> (primary key)
ID | INTEGER
PROPERTY_ID | INTEGER
FIRST_NAME | VARCHAR(STRING)
LAST_NAME | VARCHAR(STRING)
COUNT_RESERVATIONS | INTEGER
IS_REPEAT_GUEST | INTEGER
EMAIL | VARCHAR(STRING)
BIRTHDAY | INTEGER
CPF | VARCHAR(STRING)
PHONE | VARCHAR(STRING)
GENDER | VARCHAR(STRING)
CELL_PHONE | VARCHAR(STRING)
STREET | VARCHAR(STRING)
NUMBER | VARCHAR(STRING)
COMPLEMENT | VARCHAR(STRING)
NEIGHBORHOOD | VARCHAR(STRING)
CITY | VARCHAR(STRING)
STATE | VARCHAR(STRING)
ZIP | VARCHAR(STRING)
COUNTRY | VARCHAR(STRING)
RG | VARCHAR(STRING)
ISSUE_DATE | INTEGER
ISSUER | VARCHAR(STRING)
ISSUER_STATE | VARCHAR(STRING)
USER_ID | INTEGER
ADDRESS1 | VARCHAR(STRING)
ADDRESS2 | VARCHAR(STRING)
ID_PHOTO | INTEGER
DELETED | INTEGER
STATUS_ID | INTEGER
STATUS_NAME | VARCHAR(STRING)
DOCUMENT_TYPE | VARCHAR(STRING)
DOCUMENT_NUMBER | VARCHAR(STRING)
DOCUMENT_ISSUE_DATE | INTEGER
DOCUMENT_ISSUING_COUNTRY | VARCHAR(STRING)
DOCUMENT_EXPIRATION_DATE | INTEGER
LAST_CHANGE | BIGINT
IS_OPT_IN | INTEGER
OPT_IN_HASH | VARCHAR(STRING)
EMAIL_HASH | VARCHAR(STRING)
IS_ANONYMIZED | INTEGER
GUEST_TAX_ID_NUMBER | VARCHAR(STRING)
COMPANY_NAME | VARCHAR(STRING)
COMPANY_TAX_ID_NUMBER | VARCHAR(STRING)
IS_MERGED | INTEGER
__OP | VARCHAR(STRING)
__TABLE | VARCHAR(STRING)
__SOURCE_TS_MS | BIGINT
__TS_MS | BIGINT
__DELETED | VARCHAR(STRING)
Adding some context here: my theory is that debezium always adds metadata columns at the end of the schema, so if you ever add anything to the schema, it'll be added to the middle, which CREATE OR REPLACE
doesn't like.
AVRO
should be agnostic to the ordering of columns, so we may be able to support re-ordering them without making any other changes, but it requires some testing
@mparikhcloudbeds I left a comment of a possible workaround here https://github.com/confluentinc/ksql/issues/8160#issuecomment-936728536. This workaround is based on the Avro docs about Schema Resolution that states that two schemas match if both are records and the ordering of fields may be different: fields are matched by name.
You can give it a try. You can change the reordering of the AVRO fields in SR directly (by leaving the new columns at the end), and then recreate the stream/table. I'm not 100% sure why it works, though, as I was looking at how the data format is serialized and don't see the fields names are shown somewhere, but it works. I tried reads and inserts with different schemas reorders and all are showing the right values.
@pgaref , this is potentially something you can pick up in Q3 and we have some customers requesting for it.