flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[cdc-common] add field of defaultValue to Column.

Open lvyanquan opened this issue 2 years ago • 9 comments

This closes https://github.com/ververica/flink-cdc-connectors/issues/2936.

lvyanquan avatar Dec 29 '23 02:12 lvyanquan

Add a configuration of include-schema-comments to support column comment.

lvyanquan avatar Dec 29 '23 03:12 lvyanquan

PTAL @ruanhang1993 @Jiabao-Sun.

lvyanquan avatar Jan 02 '24 02:01 lvyanquan

Thanks @lvyanquan for this contribution.

The defaultValueExpression of debezium is an unparsed default value expression, which may even include functions like ow(). I want to know what is the purpose of defaultValue and if there could be some issues if it is used to populate the value of the sink.

And it is named defaultValueExpression instead of defaultValue because some certain values are not easily serialized. Therefore, the original expression is preserved during serialization and restored to the Schema's defaultValue using defaultValueConverter.

Refers to https://github.com/debezium/debezium/pull/2671.

Jiabao-Sun avatar Jan 02 '24 09:01 Jiabao-Sun

Thanks @Jiabao-Sun for pointing out this, we will indeed encounter unexpected situations when using UUID(), NOW() or CURRENT_TIMESTAMP. I will close this PR later.

lvyanquan avatar Jan 02 '24 13:01 lvyanquan

Hi, @Jiabao-Sun. I have checked the documentation of MYSQL and find that:

The default value specified in a DEFAULT clause can be a literal constant or an expression. With one exception, enclose expression default values within parentheses to distinguish them from literal constant default values.

And based this code DefaultValueParserListener, we can distinguish between these two situations, the defaultValueExpression from a expression should be null. so we can still maintain consistency in the expression situation and add support literal constant using this defaultValueExpression of ColumnImpl, and it‘s actually a literal string.

The only problem is that setting default value to CURRENT_TIMESTAMP, we will get "1970-01-01 00:00:00" as defaultValueExpression. This is something we can specifically mention in the document.

lvyanquan avatar Jan 04 '24 08:01 lvyanquan

There are three conditions:

  1. Default value expression is literal, the value of column.defaultValueExpression() will be the same as literal.
  2. Default value expression is function, the value of column.defaultValueExpression() will be null.
  3. Default value expression is NOW() or CURRENT_TIMESTAMP, the value of column.defaultValueExpression() will be 1970-01-01 00:00:00. So we will not pass unresolved values to downstream.

lvyanquan avatar Jan 29 '24 06:01 lvyanquan

@Jiabao-Sun PTAL.

lvyanquan avatar Jan 30 '24 03:01 lvyanquan

Rebased to master.

lvyanquan avatar Mar 11 '24 07:03 lvyanquan

Hi @lvyanquan could you please rebase this with master again? Also cc @Jiabao-Sun

yuxiqian avatar Apr 26 '24 05:04 yuxiqian

@lvyanquan Could you check the failed tests? and it's recommended to rebase master.

leonardBang avatar Jul 23 '24 04:07 leonardBang

Done rebase. Add the cause of failed case is that version info of SERIALIZER were not included in TransformSchemaOperator, which will cause incompatible state, I would like to start a discussion with @aiwenmo @yuxiqian to solve it.

lvyanquan avatar Jul 24 '24 08:07 lvyanquan

Done rebase.

Add the cause of failed case is that version info of SERIALIZER were not included in TransformSchemaOperator, which will cause incompatible state, I would like to start a discussion with @aiwenmo @yuxiqian to solve it.

Can we add some magic header bytes in newer serialization formats (with serializer version of course) to distinguish it with legacy ones?

yuxiqian avatar Jul 24 '24 10:07 yuxiqian

@yuxiqian I've add magic header in SERIALIZER of TableChangeInfo to distinguish with the state in 3.1.1 and after, can you help to check again? And this change could be verified after https://github.com/apache/flink-cdc/pull/3426 and this pr, so I tend to add it in current pr.

lvyanquan avatar Jul 25 '24 02:07 lvyanquan

Thanks for @lvyanquan's rapid response! Ran migration tests and it works as expected.

To make CI pass I need to change some code, FYI:

  • TableChangeInfo#Serializer#magicTableId should be named in screaming case MAGIC_TABLE_ID as a static final constant, or Spotless will complain.

  • MySqlPipelineITCase needs update to reflect changes in default value:

Error:    MySqlPipelineITCase.testInitialStartupMode:228 
expected: CreateTableEvent{tableId=inventory_kohdit.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()}
 but was: CreateTableEvent{tableId=inventory_kohdit.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()}
...

yuxiqian avatar Jul 25 '24 03:07 yuxiqian

Rebased to master.

lvyanquan avatar Jul 30 '24 10:07 lvyanquan

Done reorganize commit message.

lvyanquan avatar Jul 31 '24 07:07 lvyanquan