[cdc-common] add field of defaultValue to Column.
This closes https://github.com/ververica/flink-cdc-connectors/issues/2936.
Add a configuration of include-schema-comments to support column comment.
PTAL @ruanhang1993 @Jiabao-Sun.
Thanks @lvyanquan for this contribution.
The defaultValueExpression of debezium is an unparsed default value expression, which may even include functions like ow(). I want to know what is the purpose of defaultValue and if there could be some issues if it is used to populate the value of the sink.
And it is named defaultValueExpression instead of defaultValue because some certain values are not easily serialized. Therefore, the original expression is preserved during serialization and restored to the Schema's defaultValue using defaultValueConverter.
Refers to https://github.com/debezium/debezium/pull/2671.
Thanks @Jiabao-Sun for pointing out this, we will indeed encounter unexpected situations when using UUID(), NOW() or CURRENT_TIMESTAMP. I will close this PR later.
Hi, @Jiabao-Sun. I have checked the documentation of MYSQL and find that:
The default value specified in a DEFAULT clause can be a literal constant or an expression. With one exception, enclose expression default values within parentheses to distinguish them from literal constant default values.
And based this code DefaultValueParserListener, we can distinguish between these two situations, the defaultValueExpression from a expression should be null.
so we can still maintain consistency in the expression situation and add support literal constant using this defaultValueExpression of ColumnImpl, and it‘s actually a literal string.
The only problem is that setting default value to CURRENT_TIMESTAMP, we will get "1970-01-01 00:00:00" as defaultValueExpression. This is something we can specifically mention in the document.
There are three conditions:
- Default value expression is literal, the value of column.defaultValueExpression() will be the same as literal.
- Default value expression is function, the value of column.defaultValueExpression() will be
null. - Default value expression is NOW() or CURRENT_TIMESTAMP, the value of column.defaultValueExpression() will be
1970-01-01 00:00:00. So we will not pass unresolved values to downstream.
@Jiabao-Sun PTAL.
Rebased to master.
Hi @lvyanquan could you please rebase this with master again? Also cc @Jiabao-Sun
@lvyanquan Could you check the failed tests? and it's recommended to rebase master.
Done rebase. Add the cause of failed case is that version info of SERIALIZER were not included in TransformSchemaOperator, which will cause incompatible state, I would like to start a discussion with @aiwenmo @yuxiqian to solve it.
Done rebase.
Add the cause of failed case is that version info of SERIALIZER were not included in TransformSchemaOperator, which will cause incompatible state, I would like to start a discussion with @aiwenmo @yuxiqian to solve it.
Can we add some magic header bytes in newer serialization formats (with serializer version of course) to distinguish it with legacy ones?
@yuxiqian I've add magic header in SERIALIZER of TableChangeInfo to distinguish with the state in 3.1.1 and after, can you help to check again? And this change could be verified after https://github.com/apache/flink-cdc/pull/3426 and this pr, so I tend to add it in current pr.
Thanks for @lvyanquan's rapid response! Ran migration tests and it works as expected.
To make CI pass I need to change some code, FYI:
-
TableChangeInfo#Serializer#magicTableIdshould be named in screaming caseMAGIC_TABLE_IDas astatic finalconstant, or Spotless will complain. -
MySqlPipelineITCaseneeds update to reflect changes in default value:
Error: MySqlPipelineITCase.testInitialStartupMode:228
expected: CreateTableEvent{tableId=inventory_kohdit.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()}
but was: CreateTableEvent{tableId=inventory_kohdit.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()}
...
Rebased to master.
Done reorganize commit message.