paimon icon indicating copy to clipboard operation
paimon copied to clipboard

[Feature] Kafka debezium json supports automatic discovery of primary keys

Open sunxiaojian opened this issue 1 year ago • 3 comments

Purpose

Linked issue: close https://github.com/apache/incubator-paimon/issues/2802

Tests

API and Format

Documentation

sunxiaojian avatar Jan 29 '24 12:01 sunxiaojian

@yuzelin @MonsterChenzhuo @JingsongLi PTAL

sunxiaojian avatar Jan 30 '24 07:01 sunxiaojian

Provide a demo of cdc to kafka to automatically retrieve the table primary key from MySQL and add the pkNames node to the binlog

jiangjin-f avatar Mar 27 '24 12:03 jiangjin-f

Provide a demo of cdc to kafka to automatically retrieve the table primary key from MySQL and add the pkNames node to the binlog

@jiangjin-f When Debezium's data is written into Kafka, the primary key will be automatically stored in the key. When Paimon parses Kafka messages, the data in the key will be attached to the ’pkNames‘ field in the value . There are some demos in unit testing

sunxiaojian avatar Mar 27 '24 14:03 sunxiaojian

Provide a demo of cdc to kafka to automatically retrieve the table primary key from MySQL and add the pkNames node to the binlog

@jiangjin-f When Debezium's data is written into Kafka, the primary key will be automatically stored in the key. When Paimon parses Kafka messages, the data in the key will be attached to the ’pkNames‘ field in the value . There are some demos in unit testing

Seemed that still not committed into master branch, i hope it could get the primary key from kafka key , not from the kafka value, because the Debezium format doesn't contain the pknames

medivh511 avatar May 28 '24 08:05 medivh511

我尝试实现DebeziumDeserializationSchema, 在deserialize 重载方法中获取keys ,按照Canal format 在payload嵌入pkNames,然后使用paimon的kafka_sync_database可以实现从Debezium-json获得主键建表 ` @Override public void deserialize(SourceRecord record, Collector<String> out) throws Exception { if (this.jsonConverter == null) { this.initializeJsonConverter(); } byte[] bytes = this.jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()); out.collect(initJsonString(record.key(), new String(bytes))); }

/**
 * 参考JsonDebeziumDeserializationSchema对key的解析提取转换为value JSON的pkNames
 * 重写基于cdc data开起了schema的结构
 * @param key 记录的键,为Struct类型
 * @param value 记录的值,该值将被转换为JSON字符串的一部分
 * @return 返回一个JSON字符串
 */
private String initJsonString(Object key, String value) throws Exception {
    ObjectMapper mapper = new ObjectMapper();
    ObjectNode element = (ObjectNode) mapper.readTree(value);
    ObjectNode pNode = (ObjectNode) element.get("payload");
    if (key != null) {
        Struct struct = (Struct)key;
        ArrayNode arrayNode = mapper.createArrayNode();
        struct.schema().fields().forEach(field -> arrayNode.add(field.name()));
        pNode.putIfAbsent("pkNames", arrayNode);
    } else {
        // 目前让key为null的报错,之后根据经验可以考虑改为key为null的默认值
        throw new Exception("Conversion error-YSJ: key is null that is required and has no default value");
    }
    return element.toString();
}

lalalaYu avatar Jul 13 '24 06:07 lalalaYu