[FLINK-35242] Supports per-SE type configuration & tolerance evolution behavior
This closes FLINK-35242.
- Adds
TRY_EVOLVEbehavior that tolerates exception during metadata applying process - Supports configuring various behavior for each schema evolution type with
include.schema.changesandexclude.schema.changessink option - Adds schema evolution IT cases
With this change, Schema Operator now stores both "upstream" schema (which always keep up with the structure coming from pipeline source after the transformation) and "evolved" schema (the schema that actually applied to sink). They might differ when:
- Schema change events are partially or fully ignored (by fine-grained options /
IGNOREbehavior) - Some schema change events failed to apply but pipeline keeps executing (in
TRY_EVOLVEmode)
If these cases occur, SchemaOperator needs to cast upstream schema to match the actual evolved version (by adding / removing columns, converting types, etc.) with tolerance (fill in null if meaningful casts are not possible.)
@PatrickRen @leonardBang PTAL
Just added LENIENT schema evolve behavior & rebased to master. Could @loserwang1024 please take a look at this?
Since this PR has grown too big, hopefully this could be reviewed & merged soon to avoid any conflicts hassle in the future. cc @leonardBang
@yuxiqian Thanks for the great work, could you rebase to latest master?
Done, rebased with master branch
Squashed & Rebased with master.
Since this also changes SchemaManager serialization format, could @lvyanquan please take a look? (Serialization version isn't updated since it's expected that no version will be released between this PR and FLINK-34638.