paimon icon indicating copy to clipboard operation
paimon copied to clipboard

[cdc] support debezium-json format with schema for kafka sync action

Open zhangjun0x01 opened this issue 1 year ago • 13 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Motivation

support debezium-json format with schema for kafka sync action.

we can get the schema from the json format ,the format like this :

{
  "schema": {...},
  "payload": {
    "before": {
      "id": 111,
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": 5.18
    },
    "after": {
      "id": 111,
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": 5.15
    },
    "source": {...},
    "op": "u",
    "ts_ms": 1589362330904,
    "transaction": null
  }
}

Solution

No response

Anything else?

No response

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

zhangjun0x01 avatar Jan 15 '24 08:01 zhangjun0x01

@zhangjun0x01 I remember this feature is already supported. Is there any problem now?

sunxiaojian avatar Jan 15 '24 09:01 sunxiaojian

@zhangjun0x01 I remember this feature is already supported. Is there any problem now? 我记得这个功能已经支持了。现在有什么问题吗?

The json given by flink-cdc does not contain the field pkNames, but the debezuim-json parsed in the source code needs to contain pkNames, otherwise when synchronizing multiple tables, it will report that the primary key does not have this exception.When it parses debezium-json, the required fields are as follows: private static final String FIELD_SCHEMA = "schema"; private static final String FIELD_PAYLOAD = "payload"; private static final String FIELD_BEFORE = "before"; private static final String FIELD_AFTER = "after"; private static final String FIELD_SOURCE = "source"; private static final String FIELD_PRIMARY = "pkNames"; private static final String FIELD_DB = "db"; private static final String FIELD_TYPE = "op"; private static final String OP_INSERT = "c"; private static final String OP_UPDATE = "u"; private static final String OP_DELETE = "d"; private static final String OP_READE = "r";

heine0310 avatar Jan 15 '24 09:01 heine0310

@zhangjun0x01 I remember this feature is already supported. Is there any problem now? 我记得这个功能已经支持了。现在有什么问题吗?

The json given by flink-cdc does not contain the field pkNames, but the debezuim-json parsed in the source code needs to contain pkNames, otherwise when synchronizing multiple tables, it will report that the primary key does not have this exception.When it parses debezium-json, the required fields are as follows: private static final String FIELD_SCHEMA = "schema"; private static final String FIELD_PAYLOAD = "payload"; private static final String FIELD_BEFORE = "before"; private static final String FIELD_AFTER = "after"; private static final String FIELD_SOURCE = "source"; private static final String FIELD_PRIMARY = "pkNames"; private static final String FIELD_DB = "db"; private static final String FIELD_TYPE = "op"; private static final String OP_INSERT = "c"; private static final String OP_UPDATE = "u"; private static final String OP_DELETE = "d"; private static final String OP_READE = "r";

At present, the synchronization mode should only support specifying IDs for a single table, as the debezium data ID is stored in the key, so multi table recognition requires special compatibility

sunxiaojian avatar Jan 15 '24 14:01 sunxiaojian

这个需要特殊设置吗?我实际使用kafka-connetor发送到kafka的数据,依然存在主键不存在的问题

luckyLJY avatar Jan 28 '24 04:01 luckyLJY

整库以及单表支持表结构的修改吗?

luckyLJY avatar Jan 28 '24 04:01 luckyLJY

@zhangjun0x01 I remember this feature is already supported. Is there any problem now? 我记得这个功能已经支持了。现在有什么问题吗?

The json given by flink-cdc does not contain the field pkNames, but the debezuim-json parsed in the source code needs to contain pkNames, otherwise when synchronizing multiple tables, it will report that the primary key does not have this exception.When it parses debezium-json, the required fields are as follows: private static final String FIELD_SCHEMA = "schema"; private static final String FIELD_PAYLOAD = "payload"; private static final String FIELD_BEFORE = "before"; private static final String FIELD_AFTER = "after"; private static final String FIELD_SOURCE = "source"; private static final String FIELD_PRIMARY = "pkNames"; private static final String FIELD_DB = "db"; private static final String FIELD_TYPE = "op"; private static final String OP_INSERT = "c"; private static final String OP_UPDATE = "u"; private static final String OP_DELETE = "d"; private static final String OP_READE = "r";

hi @zhangjun0x01 In the case of synchronizing multiple tables: multiple tables with different structures be stored in one topic, or multiple topics?

sunxiaojian avatar Jan 29 '24 12:01 sunxiaojian

整库以及单表支持表结构的修改吗?

@luckyLJY supported, reference : https://github.com/apache/incubator-paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java#L58

sunxiaojian avatar Jan 30 '24 08:01 sunxiaojian

报错缺少主键。在整库cdc时。

---Original--- From: "Xiaojian @.> Date: Mon, Jan 29, 2024 20:18 PM To: @.>; Cc: @.@.>; Subject: Re: [apache/incubator-paimon] [cdc] support debezium-json formatwith schema for kafka sync action (Issue #2701)

@zhangjun0x01 I remember this feature is already supported. Is there any problem now? 我记得这个功能已经支持了。现在有什么问题吗?

The json given by flink-cdc does not contain the field pkNames, but the debezuim-json parsed in the source code needs to contain pkNames, otherwise when synchronizing multiple tables, it will report that the primary key does not have this exception.When it parses debezium-json, the required fields are as follows: private static final String FIELD_SCHEMA = "schema"; private static final String FIELD_PAYLOAD = "payload"; private static final String FIELD_BEFORE = "before"; private static final String FIELD_AFTER = "after"; private static final String FIELD_SOURCE = "source"; private static final String FIELD_PRIMARY = "pkNames"; private static final String FIELD_DB = "db"; private static final String FIELD_TYPE = "op"; private static final String OP_INSERT = "c"; private static final String OP_UPDATE = "u"; private static final String OP_DELETE = "d"; private static final String OP_READE = "r";

hi @zhangjun0x01 In the case of synchronizing multiple tables: multiple tables with different structures be stored in one topic, or multiple topics?

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you were mentioned.Message ID: @.***>

luckyLJY avatar Feb 02 '24 01:02 luckyLJY

整库同步依然寻找不到主键,用kafka connect 生成的schema里默认是没有的任何主键标注的,0.8没有解决这个问题,唯一的解决办法目前是手动创建主键表,再做模式识别

medivh511 avatar May 27 '24 15:05 medivh511

@zhangjun0x01 I remember this feature is already supported. Is there any problem now?

在你的commit描述里: When Debezium's data is written into Kafka, the primary key will be automatically stored in the key. When Paimon parses Kafka messages, the data in the key will be attached to the ’pkNames‘ field in the value . There are some demos in unit testing

如果是以Debezium的key为主键(value假定是你的debezium-data-1.txt的模式),那key的样式是什么? 我的oracle cdc 到 kafka的 connect是这样的 { "name": "test03", "config": { "connector.class": "io.debezium.connector.oracle.OracleConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "topic.prefix": "test", "database.hostname": "...", "database.port": "1521", "database.user": "...", "database.password": "...", "database.dbname": "L2DB", "table.include.list": "FLINKUSER.ACT_DL", "schema.include.list": "FLINKUSER", "schema.history.internal.kafka.topic": "schema-changes.l2db", "snapshot.mode": "initial", "log.mining.strategy": "online_catalog", "database.history.store.only.captured.tables.ddl": "true", "database.tablename.case.insensitive": "false", "log.mining.continuous.mine": "true", "decimal.handling.mode": "string", "schema.history.internal.kafka.bootstrap.servers": "172.15.89.142:9092,172.15.89.181:9092,172.15.89.182:9092", "value.converter.schemas.enable": "true" } }

其中 "key.converter": "org.apache.kafka.connect.storage.StringConverter" 请问这种格式是否能被解析?还是说有标准的格式?

medivh511 avatar May 27 '24 16:05 medivh511

[Feature] Kafka debezium json supports automatic discovery of primary keys #2815 没有合并到master branch里, 0.8是肯定解决不了了

medivh511 avatar May 28 '24 08:05 medivh511

still not committed into the master branch or 0.8.1, primary key not found issue still exist

medivh511 avatar Jun 23 '24 16:06 medivh511

为什么这个pr被close了 https://github.com/apache/paimon/pull/2815

Pandas886 avatar Sep 04 '24 13:09 Pandas886