flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

Mysql-connector cdc sourcerecord not have data type information,bug dez record have

Open niuhu3 opened this issue 2 years ago • 1 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Motivation

when we develop streaming data platform , we also need data types to satisfied dynamic change table schema. But i fund Mysql-connector cdc record has too many useless information. I suggested to use dbz record information type to meet more needed.

Solution

change sourcerecord code to dbz record . such as below

Alternatives

EmbeddedEngineChangeEvent[key = { "schema": { "type": "struct", "fields": [{ "type": "int64", "optional": false, "field": "id" }], "optional": false, "name": "mysql_connector.gmall.activity_info.Key" }, "payload": { "id": 3 } }, value = { "schema": { "type": "struct", "fields": [{ "type": "struct", "fields": [{ "type": "int64", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "activity_name" }, { "type": "string", "optional": true, "field": "activity_type" }, { "type": "string", "optional": true, "field": "activity_desc" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "start_time" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "end_time" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "create_time" }], "optional": true, "name": "mysql_connector.gmall.activity_info.Value", "field": "before" }, { "type": "struct", "fields": [{ "type": "int64", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "activity_name" }, { "type": "string", "optional": true, "field": "activity_type" }, { "type": "string", "optional": true, "field": "activity_desc" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "start_time" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "end_time" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "create_time" }], "optional": true, "name": "mysql_connector.gmall.activity_info.Value", "field": "after" }, { "type": "struct", "fields": [{ "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "string", "optional": true, "name": "io.debezium.data.Enum", "version": 1, "parameters": { "allowed": "true,last,false,incremental" }, "default": "false", "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "sequence" }, { "type": "string", "optional": true, "field": "table" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "query" }], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "struct", "fields": [{ "type": "string", "optional": false, "field": "id" }, { "type": "int64", "optional": false, "field": "total_order" }, { "type": "int64", "optional": false, "field": "data_collection_order" }], "optional": true, "field": "transaction" }], "optional": false, "name": "mysql_connector.gmall.activity_info.Envelope" }, "payload": { "before": null, "after": { "id": 3, "activity_name": "ccccc", "activity_type": "1003", "activity_desc": "fffff", "start_time": null, "end_time": null, "create_time": null }, "source": { "version": "1.9.5.Final", "connector": "mysql", "name": "mysql-connector", "ts_ms": 1694568910248, "snapshot": "true", "db": "gmall", "sequence": null, "table": "activity_info", "server_id": 0, "gtid": null, "file": "mysql-bin.000015", "pos": 154, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1694568910248, "transaction": null } }, sourceRecord = SourceRecord { sourcePartition = { server = mysql - connector }, sourceOffset = { ts_sec = 1694568910, file = mysql - bin .000015, pos = 154, snapshot = true } } ConnectRecord { topic = 'mysql-connector.gmall.activity_info', kafkaPartition = null, key = Struct { id = 3 }, keySchema = Schema { mysql_connector.gmall.activity_info.Key: STRUCT }, value = Struct { after = Struct { id = 3, activity_name = ccccc, activity_type = 1003, activity_desc = fffff }, source = Struct { version = 1.9 .5.Final, connector = mysql, name = mysql - connector, ts_ms = 1694568910248, snapshot = true, db = gmall, table = activity_info, server_id = 0, file = mysql - bin .000015, pos = 154, row = 0 }, op = r, ts_ms = 1694568910248 }, valueSchema = Schema { mysql_connector.gmall.activity_info.Envelope: STRUCT }, timestamp = null, headers = ConnectHeaders(headers = ) }]

Anything else?

No response

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

niuhu3 avatar Nov 03 '23 08:11 niuhu3

Actually you could provide a custom deserializer in MySqlSource by using the DataStream API. But there is no custom deserializer setting option for the sql.

ruanhang1993 avatar Feb 04 '24 12:02 ruanhang1993

Closing this issue as it has been migrated to Apache Jira.

PatrickRen avatar Apr 09 '24 06:04 PatrickRen