[FLINK-37677][cdc-common][cdc-runtime] Handle Exclusion of `create.table` Events in Flink CDC Schema Evolution
Description
This PR addresses the issue identified in FLINK-37677, where setting exclude.schema.changes: [create.table] in the YAML configuration causes Flink CDC to throw an IllegalStateException.
java.lang.IllegalStateException: Unable to coerce data record from my_company.my_branch.customers (schema: columns={`id` INT,`name` STRING NOT NULL,`age` SMALLINT}, primaryKeys=id, options=()) to my_company.my_branch.customers (schema: null)
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.lambda$handleDataChangeEvent$1(SchemaOperator.java:215)
at java.util.Optional.orElseThrow(Optional.java:290)
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleDataChangeEvent(SchemaOperator.java:212)
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:150)
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaEvolveTest.processEvent(SchemaEvolveTest.java:2643)
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaEvolveTest.testLenientSchemaEvolvesExcludeCreate(SchemaEvolveTest.java:2600)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at java.util.ArrayList.forEach(ArrayList.java:1259)
Motivation
- Platform-managed data lake/Starrocks users often lack permission to create tables directly.(Casual table creation leads to complex follow-up handling. The platform will control table creation permissions but will allow schema changes.)
- Users may need to customize table properties.
- If a created table doesn't meet requirements, users must delete it and repeatedly restart the task.
Solution
- Always accept create.table event
- Skip apply create.table schema change event to external system
Testing
Added unit tests.
@yuxiqian cc
CI test:https://github.com/beryllw/flink-cdc/actions/runs/14992713134?pr=12
One minor concern is if we allow skipping CreateTableEvents, we can't ensure the schemas of external tables are consistent with SchemaRegistry. Jobs may fail or write columns in wrong order.
Thanks for pointing out this concern, @yuxiqian .
Regarding ensuring the schemas of external tables stay consistent with SchemaRegistry, I think we could extend the schema compatibility mechanism (similar to what was done in PR #4081) in future.
Perhaps we can hold off on finalizing this PR until the schema compatibility handling is further improved.