Seatunel 使用kafka source(canal_json) 如何实现 自动schema 映射?
需求描述
- Seatunel 使用kafka source,格式为 canal_json。每一条消息都带有对应的database 、table 和 sqlType,sqlType 记录了所有的列定义,所以其实 canal_json 自身就已经有完整的 schema 定义。
- 我的某个topic里会有不同的表,每个表的字段定义都不一样,用一个schema 定义我这个topic 的所有表,显然不合适。
- 如果我想达到我的目的,应该怎样实现?
参考文档
@YYChildren like this data?
@Hisoka-X paimon supports the features https://paimon.apache.org/docs/master/cdc-ingestion/kafka-cdc/ Is SeaTunnel going to implement this functionality ?
@dyp12 This is data in kafka .
{
"data": [
{
"id": "1",
"v1": "2"
},
{
"id": "1",
"v1": "2"
}
],
"database": "test",
"es": 1749187027000,
"gtid": "",
"id": 5,
"isDdl": false,
"mysqlType": {
"id": "int",
"v1": "int"
},
"old": [
{
"v1": "1"
},
{
"v1": "1"
}
],
"pkNames": null,
"sql": "",
"sqlType": {
"id": 4,
"v1": 4
},
"table": "t1",
"ts": 1749187027079,
"type": "UPDATE"
}
@dyp12 This is data in kafka .
{ "data": [ { "id": "1", "v1": "2" }, { "id": "1", "v1": "2" } ], "database": "test", "es": 1749187027000, "gtid": "", "id": 5, "isDdl": false, "mysqlType": { "id": "int", "v1": "int" }, "old": [ { "v1": "1" }, { "v1": "1" } ], "pkNames": null, "sql": "", "sqlType": { "id": 4, "v1": 4 }, "table": "t1", "ts": 1749187027079, "type": "UPDATE" }
@YYChildren similar data,we use aliyun DTS to read mysql data , and send to kafka
I want to sync data from kafka to clickhouse , but seatunnel don't support multi-table schema in one kafka topic. @dyp12
I want to sync data from kafka to clickhouse , but seatunnel don't support multi-table schema in one kafka topic. @dyp12
you can use table_list to support multi-table schema in one kafka topic @YYChildren
I want to sync data from kafka to clickhouse , but seatunnel don't support multi-table schema in one kafka topic. @dyp12
you can use table_list to support multi-table schema in one kafka topic @YYChildren
Yep, please use table_list. You can refer https://seatunnel.apache.org/zh-CN/docs/2.3.11/connector-v2/source/Kafka#%E5%A4%9A-kafka-%E6%BA%90%E7%A4%BA%E4%BE%8B
I want to sync data from kafka to clickhouse , but seatunnel don't support multi-table schema in one kafka topic. @dyp12
you can use table_list to support multi-table schema in one kafka topic @YYChildren
Yep, please use
table_list. You can refer https://seatunnel.apache.org/zh-CN/docs/2.3.11/connector-v2/source/Kafka#%E5%A4%9A-kafka-%E6%BA%90%E7%A4%BA%E4%BE%8B
read schema from kafka data,Is SeaTunnel going to implement this functionality ?https://paimon.apache.org/docs/master/cdc-ingestion/kafka-cdc/ @Hisoka-X
I am not sure how to implement this logic, because the schema of SeaTunnel is determined before the reader starts reading data. Currently, this architecture cannot determine the schema when reading data. If we want to support this function, our architecture needs to be upgraded to support dynamic addition of transforms and sinks.
I am not sure how to implement this logic, because the schema of SeaTunnel is determined before the reader starts reading data. Currently, this architecture cannot determine the schema when reading data. If we want to support this function, our architecture needs to be upgraded to support dynamic addition of transforms and sinks.
SchemaChangeEvent can change schema after the reader starts reading data, may be can add a SchemaCreateEvent to add schema
when i use SeaTunnel to read mysql cdc data to kafka, i set mysql cdc table-pattern , after the reader starts reading data, and then i create a new table in mysql , the SeaTunnel can read new table data, but can not send to kafka, because of new table schema not add, so i need to restart SeaTunnel , inconvenient @Hisoka-X
may be can add a SchemaCreateEvent to add schema
Yes, we can use this way.
This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.
This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.