[FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column
This closes FLINK-35272.
Currently, pipeline jobs with transform (including projection and filtering) are constructed with the following topology:
SchemaTransformOp --> DataTransformOp --> SchemaOp
where schema projections are applied in SchemaTransformOp and data projection & filtering are applied in DataTransformOp. The idea is SchemaTransformOp might be embedded in Sources in the future to reduce payload data size transferred in Flink Job.
However, current implementation has a known defect that omits unused columns too early, causing some downstream-relied columns got removed after they arrived in DataTransformOp. See a example as follows:
# Schema is (ID INT NOT NULL, NAME STRING, AGE INT)
transform:
- source-table: employee
projection: id, upper(name) as newname
filter: age > 18
Such transformation rules will fail since name and age columns are removed in SchemaTransformOp, and those data rows could not be retrieved in DataTransformOp, where the actual expression evaluation and filtering comes into effect.
This PR introduces a new design, renaming the transform topology as follows:
PreTransformOp --> PostTransformOp --> SchemaOp
where the PreTransformOp filters out columns, but only if:
- The column is not present in projection rules
- The column is not indirectly referenced by calculation and filtering expressions
Referenced columns will be generated with exact same order as in the original schema. All schema and data events about those temporarily-referenced columns will be omitted after PostTransformOp. For example, given the following transform rule:
# Schema is (ID INT NOT NULL, NAME STRING, AGE INT)
transform:
- source-table: employee
projection: id, age + 4 as newage
filter: age > 4
PreTransformOp will yield an intermediate schema (ID INT NOT NULL, AGE INT) and corresponding trimmed data records to downstream. Calculated columns (newage here) will not be created then since they haven't been evaluated here; Unused columns (name here) will be removed as early as possible.
~~If a column is explicity written down, it will be passed to downstream as-is. But for referenced columns, a special prefix will be added to their names. In the example above, a schema like [id, newname, __PREFIX__name, __PREFIX__age] will be generated to downstream. Notice that the expression evaluation and filtering will not come into effect for now, so a DataChangeEvent would be like [1, null, 'Alice', 19].~~
~~Adding prefix is meant to deal with such cases:~~
# Schema is (ID INT NOT NULL, NAME STRING, AGE INT)
transform:
- source-table: employee
projection: id, upper(name) as name
~~Here we need to distinguish the calculated column (new) name and the referenced original column (old) name. So after the name mangling process the schema would be like: [id, name, __PREFIX__name].~~
~~Also, the filtering process is still done in PostTransformOp since user could write down a filter expression that references calculated column, but their value won't be available until PostTransformOp's evaluation. It also means in the following somewhat ambigious case:~~
# Schema is (ID INT NOT NULL, NAME STRING, AGE INT)
transform:
- source-table: employee
projection: id, age * 2 as age
filter: age > 18
~~The filtering expression is applied to the calculated age column (doubled!) instead of the original one.~~
Now, any calculated column referenced in filtering column will be rewritten as its original definition. For example, the following transform rule:
transform:
- source-table: employee
projection: id, age * 2 as newage
filter: newage > 18
...will be rewritten as follows:
transform:
- source-table: employee
projection: id, age * 2 as newage
filter: age * 2 > 18
Hence, no calculated columns need to be evaluated before filtering process.
This PR is still in very early progress, looking for @aiwenmo & @lvyanquan's comments.
Updated based on previous comments, cc @aiwenmo
Thanks for @aiwenmo's kindly review, addressed comments above.
Thanks @aiwenmo for reviewing, I've addressed your comments.
cc @PatrickRen @lvyanquan
Thanks for @lvyanquan's comments! Addressed your comments in latest commits.
Fixed another problem in transform when combined with route & schema evolution. Really needs feedback from @leonardBang & @aiwenmo since this PR has gone a bit out of control.
@yuxiqian Thanks for the improvement, could you rebase to latest master?
@leonardBang Done, rebased to master. However, since FLINK-35242 and FLINK-35272 has some overlapping changes, more rebasing might be necessary later.
The community has just reported ticket FLINK-35852 that could be closed by this PR. I've added another E2e test case to cover this issue, where applying transform rules on complex data types may yield broken data.
Done, rebased due to some conflicts with https://github.com/apache/flink-cdc/commit/26ff6d2a081181f3df7aa49d65d804c57c634122. Will add CAST ... AS tests after #3357 got merged.
Resolved conflicts with #3357 and added CAST ... AS tests in UT / E2e cases. Seems CI failure is a known issue and should be fixed by #3449.
Rebased to latest master.
Seems CI is failing due to an expired link in Doris docs. Pushed another commit to fix this.