Mysql-connector cdc sourcerecord not have data type information,bug dez record have
Search before asking
- [X] I searched in the issues and found nothing similar.
Motivation
when we develop streaming data platform , we also need data types to satisfied dynamic change table schema. But i fund Mysql-connector cdc record has too many useless information. I suggested to use dbz record information type to meet more needed.
Solution
change sourcerecord code to dbz record . such as below
Alternatives
EmbeddedEngineChangeEvent[key = { "schema": { "type": "struct", "fields": [{ "type": "int64", "optional": false, "field": "id" }], "optional": false, "name": "mysql_connector.gmall.activity_info.Key" }, "payload": { "id": 3 } }, value = { "schema": { "type": "struct", "fields": [{ "type": "struct", "fields": [{ "type": "int64", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "activity_name" }, { "type": "string", "optional": true, "field": "activity_type" }, { "type": "string", "optional": true, "field": "activity_desc" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "start_time" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "end_time" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "create_time" }], "optional": true, "name": "mysql_connector.gmall.activity_info.Value", "field": "before" }, { "type": "struct", "fields": [{ "type": "int64", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "activity_name" }, { "type": "string", "optional": true, "field": "activity_type" }, { "type": "string", "optional": true, "field": "activity_desc" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "start_time" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "end_time" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "create_time" }], "optional": true, "name": "mysql_connector.gmall.activity_info.Value", "field": "after" }, { "type": "struct", "fields": [{ "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "string", "optional": true, "name": "io.debezium.data.Enum", "version": 1, "parameters": { "allowed": "true,last,false,incremental" }, "default": "false", "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "sequence" }, { "type": "string", "optional": true, "field": "table" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "query" }], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "struct", "fields": [{ "type": "string", "optional": false, "field": "id" }, { "type": "int64", "optional": false, "field": "total_order" }, { "type": "int64", "optional": false, "field": "data_collection_order" }], "optional": true, "field": "transaction" }], "optional": false, "name": "mysql_connector.gmall.activity_info.Envelope" }, "payload": { "before": null, "after": { "id": 3, "activity_name": "ccccc", "activity_type": "1003", "activity_desc": "fffff", "start_time": null, "end_time": null, "create_time": null }, "source": { "version": "1.9.5.Final", "connector": "mysql", "name": "mysql-connector", "ts_ms": 1694568910248, "snapshot": "true", "db": "gmall", "sequence": null, "table": "activity_info", "server_id": 0, "gtid": null, "file": "mysql-bin.000015", "pos": 154, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1694568910248, "transaction": null } }, sourceRecord = SourceRecord { sourcePartition = { server = mysql - connector }, sourceOffset = { ts_sec = 1694568910, file = mysql - bin .000015, pos = 154, snapshot = true } } ConnectRecord { topic = 'mysql-connector.gmall.activity_info', kafkaPartition = null, key = Struct { id = 3 }, keySchema = Schema { mysql_connector.gmall.activity_info.Key: STRUCT }, value = Struct { after = Struct { id = 3, activity_name = ccccc, activity_type = 1003, activity_desc = fffff }, source = Struct { version = 1.9 .5.Final, connector = mysql, name = mysql - connector, ts_ms = 1694568910248, snapshot = true, db = gmall, table = activity_info, server_id = 0, file = mysql - bin .000015, pos = 154, row = 0 }, op = r, ts_ms = 1694568910248 }, valueSchema = Schema { mysql_connector.gmall.activity_info.Envelope: STRUCT }, timestamp = null, headers = ConnectHeaders(headers = ) }]
Anything else?
No response
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
Actually you could provide a custom deserializer in MySqlSource by using the DataStream API. But there is no custom deserializer setting option for the sql.
Closing this issue as it has been migrated to Apache Jira.