kuscia icon indicating copy to clipboard operation
kuscia copied to clipboard

新增数据源IO 实现:达梦

Open Candicepan opened this issue 7 months ago • 6 comments

此 ISSUE 为 [隐语开源共建计划(SecretFlow Open Source Contribution Plan,简称 SF OSCP)Phase 6 任务 ISSUE,欢迎社区开发者参与共建~

  • 认领前,辛苦确认是否完成[报名](https://studio.secretflow.com/activity/rkub4eryy7g3vmn/detail

This ISSUE is one of the tasks of the [SecretFlow Open Source Contribution Plan (referred to as SF OSCP) Phase 6. Welcome to join us in building it together!

  • Before claiming a task, please make sure you have signed up.

任务介绍

  • 任务名称:新增数据源IO 实现:达梦
  • 技术方向:DataProxy
  • 任务难度:挑战🌟🌟🌟
  • 任务预估完成时间:5周
  • 任务 Reviewer:@YanZhuangz

详细要求

目前 Kusica 支持的数据源类型已包含 localfs、OSS、MySQL、ODPS、PostgreSQL,在实际应用中,会有基于 达梦数据库的数据来源,为了避免额外的数据操作,添加 达梦数据库数据源的连接接入,丰富 Kuscia 的数据源生态。

  • 要求:
    • 正确处理数据类型的映射,确保数据的正确读取与写入;
    • 正确处理 NULL 值(读、写);
    • 数据写入时需保证写入的顺序,避免出现写入乱序;
    • 数据写入时进行覆盖写(分区表的话对分区进行覆盖写),保证任务执行时写入数据都是最后一次任务写入的数据;
    • 数据读取时需保证读取顺序是和写入一致,某些任务写入数据时已经进行了排序。
  • 任务说明:

能力要求

  • 对 Kuscia datamesh 原理有一定了解
  • 对 Apache Arrow 有一定了解
  • 熟悉 Java 语言

操作说明

Candicepan avatar May 22 '25 15:05 Candicepan

kongxiaoran Give it to me

kongxiaoran avatar Oct 30 '25 01:10 kongxiaoran

恭喜成功🏅认领 @kongxiaoran

Niko-Zeng avatar Oct 30 '25 06:10 Niko-Zeng

Kuscia 接入达梦数据源 设计方案

1. 概述

1.1. 背景

Kuscia目前已支持 MySQL、PostgreSQL 等常用数据库,但缺少对达梦数据库的直接支持。本次issue需要为 DataProxy 开发一个新的数据源插件 dataproxy-plugin-dameng,实现对达梦数据库的高效、可靠读写,打通上层计算应用与达梦数据库之间的数据链路。

1.2. 目标

  • 开发一个独立的、可插拔的达梦数据源插件。
  • 实现 DataProxy 与达梦数据库之间基于 Apache Arrow 格式的数据双向流动。
  • 确保数据类型映射的准确性,特别是数值、日期和文本类型。
  • 正确处理 NULL 值(读、写)
  • 数据写入时进行覆盖写(分区表的话对分区进行覆盖写),保证任务执行时写入数据都是最后一次任务写入的数据;
  • 保证读写过程中的数据顺序一致性。
  • 提供完整的单元测试和集成测试,确保插件的稳定性和可靠性。

2.总体设计

2.1技术选型

进行过前期调研,确认达梦官方支持 Java JDBC,且从生态上来说选择使用Java对接达梦优于Go。

了解过DataProxy 现有的插件体系后,计划利用达梦官方提供的 JDBC 驱动 (DmJdbcDriver18.jar) 来与数据库进行交互。这种方式与现有的 dataproxy-plugin-database 模块实现原理一致,可以最大程度地复用现有抽象和逻辑,降低开发和维护成本。

2.2 架构设计

将创建一个新的 Maven 模块:dataproxy-plugin-dameng。该插件将实现 DataProxy 的核心 SPI 接口,并负责将 DataProxy 的内部数据格式(Apache Arrow)与 JDBC 的 ResultSet/PreparedStatement 进行双向转换。

3.具体设计

3.1新增内容

  1. DamengFlightProducer (入口类)

  2. DamengUtil (工具类) - 6个核心函数。

函数 职责
initDameng() JDBC连接初始化
buildQuerySql() SELECT语句构建
jdbcType2ArrowType() JDBC→Arrow类型转换
buildCreateTableSql() CREATE TABLE语句
buildMultiRowInsertSql() 批量INSERT语句
checkTableExists() 表存在性检查

这块实现可以学习Hive插件的实现

3.2 关键要点

实现内容中,很重要的一点就是正确处理数据类型映射!

基于[达梦官方 JDBC 编程指南](https://eco.dameng.com/document/dm/zh-cn/pm/jdbc-rogramming-guide.html#4.4.1%20%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%E6%89%A9%E5%B1%95)和 Apache Arrow 类型系统的精确映射:

达梦类型 达梦编号 JDBC类型 Arrow TypeID Arrow具体类型 说明
基础数值类型
TINYINT 5 TINYINT(-6) Int(2) ArrowType.Int(8, true) 8位有符号整数
SMALLINT 6 SMALLINT(5) Int(2) ArrowType.Int(16, true) 16位有符号整数
INT 7 INTEGER(4) Int(2) ArrowType.Int(32, true) 32位有符号整数
BIGINT 8 BIGINT(-5) Int(2) ArrowType.Int(64, true) 64位有符号整数
DECIMAL 9 DECIMAL(3) Decimal(7) ArrowType.Decimal(p,s,128) 定点数,可变精度
REAL 10 REAL(7) FloatingPoint(3) ArrowType.FloatingPoint(SINGLE) 32位浮点数
DOUBLE 11 DOUBLE(8) FloatingPoint(3) ArrowType.FloatingPoint(DOUBLE) 64位浮点数
字符串类型
CHAR 0 CHAR(1) Utf8(5) ArrowType.Utf8 定长字符串
VARCHAR 2 VARCHAR(12) Utf8(5) ArrowType.Utf8 变长字符串
VARCHAR2 1 VARCHAR(12) Utf8(5) ArrowType.Utf8 Oracle兼容字符串
CLOB 19 CLOB(2005) LargeUtf8(20) ArrowType.LargeUtf8 大字符串对象
日期时间类型
DATE 14 DATE(91) Date(8) ArrowType.Date(DateUnit.DAY) 日期类型
TIME 15 TIME(92) Time(9) ArrowType.Time(TimeUnit.MILLISECOND, 32) 时间类型
DATETIME 16 TIMESTAMP(93) Timestamp(10) ArrowType.Timestamp(TimeUnit.MILLISECOND, null) 时间戳
DATETIME2 26 TIMESTAMP(93) Timestamp(10) ArrowType.Timestamp(TimeUnit.NANOSECOND, null) 纳秒精度时间戳
TIME_TZ 22 TIME_WITH_TIMEZONE(2013) Time(9) ArrowType.Time(TimeUnit.NANOSECOND, 64) 带时区时间
DATETIME_TZ 23 TIMESTAMP_WITH_TIMEZONE(2014) Timestamp(10) ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC") 带时区时间戳
DATETIME2_TZ 27 TIMESTAMP_WITH_TIMEZONE(2014) Timestamp(10) ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC") 纳秒精度带时区时间戳
二进制类型
BIT 3 BIT(-7) Binary(4) ArrowType.Binary 位串
BINARY 17 BINARY(-2) FixedSizeBinary(15) ArrowType.FixedSizeBinary(n) 定长二进制
VARBINARY 18 VARBINARY(-3) Binary(4) ArrowType.Binary 变长二进制
BLOB 12 BLOB(2004) LargeBinary(19) ArrowType.LargeBinary 大二进制对象
布尔类型
BOOLEAN 13 BOOLEAN(16) Bool(6) ArrowType.Bool.INSTANCE 布尔类型
达梦特有类型
INTERVAL_YM 20 OTHER(1111) Interval(11) ArrowType.Interval(IntervalUnit.YEAR_MONTH) 年-月间隔
INTERVAL_DT 21 OTHER(1111) Interval(11) ArrowType.Interval(IntervalUnit.DAY_TIME) 日-时间隔
ROWID 28 ROWID(-8) Utf8(5) ArrowType.Utf8 行标识符(转为字符串)
NULL 29 NULL(0) Null(1) ArrowType.Null 空值类型

如果有不准确的地方,后面再实际调整

4.测试

单元测试

  • DamengUtilTest: 6个工具函数的单元测试
  • 重点测试: 类型转换、SQL构建

集成测试

  • 真实达梦数据库环境测试
  • 数据量测试: 1万 → 100万 → 1000万行
  • 数据类型完整性测试
  • 达梦特有类型测试: INTERVAL_YM, INTERVAL_DT, DATETIME2
  • 分区表读写测试

@YanZhuangz 您好 Reviewer,麻烦确认设计思路方案是否可以。

kongxiaoran avatar Nov 01 '25 06:11 kongxiaoran

我发现在进行jdbc类型转化为arrow类型时,有些数据类型如果可以获取到长度、精度信息,可以更好、合理的映射到 arrow 上。比如 DECIMAL、TIMESTAMP以及BINARY等。

那么是否要修改框架代码支持传递 COLUMN_SIZEDECIMAL_DIGITS 信息: ArrowType arrowType = this.jdbcType2ArrowType.apply(columnType, columnSize, decimalDigits);,再做精细化处理。

还是说不改动框架代码,先简单实现即可。

kongxiaoran avatar Nov 02 '25 02:11 kongxiaoran

我发现在进行jdbc类型转化为arrow类型时,有些数据类型如果可以获取到长度、精度信息,可以更好、合理的映射到 arrow 上。比如 DECIMAL、TIMESTAMP以及BINARY等。

那么是否要修改框架代码支持传递 COLUMN_SIZEDECIMAL_DIGITS 信息: ArrowType arrowType = this.jdbcType2ArrowType.apply(columnType, columnSize, decimalDigits);,再做精细化处理。

还是说不改动框架代码,先简单实现即可。

关于达梦和 Arrow 的字段映射的问题,目前可以参考现有 dataproxy 中的 hive,odps 等的实现,完成现有字段的支持。先不考虑修改框架代码。 同时需要注意的是还有,新增行单测覆盖率要求达到70%,分区表支持

gaoyonglong avatar Nov 04 '25 02:11 gaoyonglong

arrow 的 large-utf8,date(多种)

gaoyonglong avatar Nov 04 '25 11:11 gaoyonglong

Kuscia支持达梦数据库测试记录

一、准备模拟数据

  1. 创建schema

    create user KUSCIA identified by 1234567890;
    
  2. 创建测试表

    -- 删除已存在的表(如果存在)
    DROP TABLE IF EXISTS COMPREHENSIVE_TYPE_TEST;
    
    -- 创建测试表
    CREATE TABLE COMPREHENSIVE_TYPE_TEST (
        -- 整数类型
        COL_TINYINT TINYINT,                    -- 8位有符号整数
        COL_SMALLINT SMALLINT,                  -- 16位有符号整数
        COL_INT INT,                            -- 32位有符号整数
        COL_BIGINT BIGINT,                      -- 64位有符号整数
    
        -- 浮点数类型
        COL_FLOAT FLOAT,                        -- 单精度浮点数
        COL_DOUBLE DOUBLE,                      -- 双精度浮点数
    
        -- 字符串类型
        COL_CHAR CHAR(50),                      -- 定长字符串
        COL_VARCHAR VARCHAR(200),               -- 变长字符串
        COL_VARCHAR2 VARCHAR2(200),            -- VARCHAR2(达梦特有)
        COL_CLOB CLOB,                          -- 大文本(LargeUtf8)
    
        -- 数值类型
        COL_DECIMAL DECIMAL(38, 2),            -- 高精度数值(precision=38, scale=2)
        COL_NUMERIC NUMERIC(18, 4),            -- 数值类型(precision=18, scale=4)
        COL_NUMBER NUMBER(10, 0),              -- NUMBER类型(整数)
    
        -- 布尔类型
        COL_BIT BIT,                            -- 布尔值(1=真,0=假)
        COL_BOOLEAN BIT,                        -- 布尔类型(达梦使用BIT表示布尔值)
    
        -- 日期时间类型
        COL_DATE DATE,                          -- 日期
        COL_TIME TIME,                          -- 时间
        COL_TIMESTAMP TIMESTAMP,                -- 时间戳(微秒精度)
        COL_DATETIME DATETIME,                  -- 日期时间
    
        -- 二进制类型
        COL_BINARY BINARY(100),                 -- 定长二进制
        COL_VARBINARY VARBINARY(500),           -- 变长二进制
        COL_BLOB BLOB,                          -- 大二进制对象(LargeBinary)
    
        -- 时间间隔类型
        COL_INTERVAL_YM INTERVAL YEAR TO MONTH, -- 年月间隔
        COL_INTERVAL_DT INTERVAL DAY TO SECOND  -- 日时间隔
    );
    
    -- 插入测试数据
    INSERT INTO COMPREHENSIVE_TYPE_TEST VALUES (
        -- 整数类型
        127,                                    -- COL_TINYINT: 最大值
        -32768,                                 -- COL_SMALLINT: 最小值
        2147483647,                             -- COL_INT: 最大值
        9223372036854775807,                    -- COL_BIGINT: 最大值
    
        -- 浮点数类型
        3.14159,                                -- COL_FLOAT: 圆周率
        2.718281828459045,                      -- COL_DOUBLE: 自然常数
    
        -- 字符串类型
        'Hello World',                          -- COL_CHAR: 固定长度字符串
        'This is a VARCHAR string',             -- COL_VARCHAR: 变长字符串
        'This is a VARCHAR2 string',           -- COL_VARCHAR2: VARCHAR2类型
        'This is a CLOB field with very long content. ' || 
        RPAD('A', 1000, 'A'),                  -- COL_CLOB: 大文本(1000+字符,使用RPAD生成)
    
        -- 数值类型
        123456789012345678901234567890.12,      -- COL_DECIMAL: 大数值(scale=2)
        12345678901234.5678,                    -- COL_NUMERIC: 数值(scale=4)
        9876543210,                             -- COL_NUMBER: 整数
    
        -- 布尔类型
        1,                                      -- COL_BIT: 真
        1,                                      -- COL_BOOLEAN: 真(BIT类型,1=真)
    
        -- 日期时间类型
        TO_DATE('2024-01-15', 'YYYY-MM-DD'),                      -- COL_DATE: 日期
        CAST('14:30:45' AS TIME),                                  -- COL_TIME: 时间
        TO_TIMESTAMP('2024-01-15 14:30:45.123456', 'YYYY-MM-DD HH24:MI:SS.FF6'), -- COL_TIMESTAMP: 时间戳(微秒)
        TO_TIMESTAMP('2024-01-15 14:30:45', 'YYYY-MM-DD HH24:MI:SS'),        -- COL_DATETIME: 日期时间
    
        -- 二进制类型
        HEXTORAW('48656C6C6F'),                 -- COL_BINARY: "Hello" 的十六进制
        HEXTORAW('576F726C64'),                 -- COL_VARBINARY: "World" 的十六进制
        HEXTORAW('48656C6C6F20576F726C6421'),  -- COL_BLOB: "Hello World!" 的十六进制
    
        -- 时间间隔类型
        INTERVAL '2-3' YEAR TO MONTH,          -- COL_INTERVAL_YM: 2年3个月
        INTERVAL '1 12:30:45.123456' DAY(1) TO SECOND(6) -- COL_INTERVAL_DT: 1天12小时30分45.123456秒
    );
    
    -- 插入第二条测试数据(包含NULL值)
    INSERT INTO COMPREHENSIVE_TYPE_TEST VALUES (
        NULL,                                   -- COL_TINYINT: NULL
        NULL,                                   -- COL_SMALLINT: NULL
        NULL,                                   -- COL_INT: NULL
        NULL,                                   -- COL_BIGINT: NULL
        NULL,                                   -- COL_FLOAT: NULL
        NULL,                                   -- COL_DOUBLE: NULL
        NULL,                                   -- COL_CHAR: NULL
        NULL,                                   -- COL_VARCHAR: NULL
        NULL,                                   -- COL_VARCHAR2: NULL
        NULL,                                   -- COL_CLOB: NULL
        NULL,                                   -- COL_DECIMAL: NULL
        NULL,                                   -- COL_NUMERIC: NULL
        NULL,                                   -- COL_NUMBER: NULL
        NULL,                                   -- COL_BIT: NULL
        NULL,                                   -- COL_BOOLEAN: NULL
        NULL,                                   -- COL_DATE: NULL
        NULL,                                   -- COL_TIME: NULL
        NULL,                                   -- COL_TIMESTAMP: NULL
        NULL,                                   -- COL_DATETIME: NULL
        NULL,                                   -- COL_BINARY: NULL
        NULL,                                   -- COL_VARBINARY: NULL
        NULL,                                   -- COL_BLOB: NULL
        NULL,                                   -- COL_INTERVAL_YM: NULL
        NULL                                    -- COL_INTERVAL_DT: NULL
    );
    
    -- 插入第三条测试数据(边界值)
    INSERT INTO COMPREHENSIVE_TYPE_TEST VALUES (
        -128,                                   -- COL_TINYINT: 最小值
        32767,                                  -- COL_SMALLINT: 最大值
        -2147483648,                           -- COL_INT: 最小值
        -9223372036854775808,                  -- COL_BIGINT: 最小值
        -3.14159,                              -- COL_FLOAT: 负浮点数
        -2.718281828459045,                    -- COL_DOUBLE: 负双精度
        'A',                                   -- COL_CHAR: 单字符
        'Short',                               -- COL_VARCHAR: 短字符串
        'Short2',                              -- COL_VARCHAR2: 短字符串
        'Short CLOB',                          -- COL_CLOB: 短文本
        0.01,                                  -- COL_DECIMAL: 最小值
        0.0001,                                -- COL_NUMERIC: 最小值
        0,                                     -- COL_NUMBER: 零
        0,                                     -- COL_BIT: 假
        0,                                     -- COL_BOOLEAN: 假(BIT类型,0=假)
        TO_DATE('1970-01-01', 'YYYY-MM-DD'),                     -- COL_DATE: Unix纪元
        CAST('00:00:00' AS TIME),                                -- COL_TIME: 午夜
        TO_TIMESTAMP('1970-01-01 00:00:00.000000', 'YYYY-MM-DD HH24:MI:SS.FF6'), -- COL_TIMESTAMP: Unix纪元
        TO_TIMESTAMP('1970-01-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS'),        -- COL_DATETIME: Unix纪元
        HEXTORAW('00'),                        -- COL_BINARY: 单字节
        HEXTORAW('FF'),                        -- COL_VARBINARY: 单字节
        HEXTORAW('000102030405'),              -- COL_BLOB: 多字节
        INTERVAL '0-0' YEAR TO MONTH,          -- COL_INTERVAL_YM: 零间隔
        INTERVAL '0 00:00:00.000000' DAY(1) TO SECOND(6) -- COL_INTERVAL_DT: 零间隔
    );
    
    

二、编译构建DataProxy、Kuscia

这里不详细说构建过程了,构建形成的镜像:

export KUSCIA_IMAGE=secretflow/kuscia:1b08d28-20251209225356
export DATAPROXY_IMAGE=dataproxy:v0.0.1-20251209225425-74b49d

快速部署P2P模式:

./scripts/deploy/kuscia.sh p2p --data-proxy

三、配置数据源及DomainData

3.1 创建数据源

docker exec -it ${USER}-kuscia-autonomy-alice bash
export CTR_CERTS_ROOT=/home/kuscia/var/certs

# 创建达梦数据源(数据源的ip、端口、用户密码信息需要自行修改)
curl -X POST 'https://localhost:8082/api/v1/domaindatasource/create' \
  --header "Token: $(cat ${CTR_CERTS_ROOT}/token)" \
  --header 'Content-Type: application/json' \
  --cert ${CTR_CERTS_ROOT}/kusciaapi-server.crt \
  --key ${CTR_CERTS_ROOT}/kusciaapi-server.key \
  --cacert ${CTR_CERTS_ROOT}/ca.crt \
  -k \
  -d '{
    "domain_id": "alice",
    "datasource_id": "kuscia-ds-01",
    "name": "达梦测试数据源(kuscia)",
    "type": "dameng",
    "info": {
      "database": {
        "endpoint": "110.40.xx.xx:5237",
        "user": "KUSCIA",
        "password": "1234567890",
        "database": "KUSCIA"
      }
    }
  }'

3.2 创建DomainData(指向达梦数据中测试表)

这里我建了两个DomainData,一个用于从达梦数据库中读取数据,一个用于写数据到达梦数据库。两个DomainData内部定义的列都是一模一样的。

export CTR_CERTS_ROOT=/home/kuscia/var/certs

curl -X POST 'https://localhost:8082/api/v1/domaindata/create' \
  --header "Token: $(cat ${CTR_CERTS_ROOT}/token)" \
  --header 'Content-Type: application/json' \
  --cert ${CTR_CERTS_ROOT}/kusciaapi-server.crt \
  --key ${CTR_CERTS_ROOT}/kusciaapi-server.key \
  --cacert ${CTR_CERTS_ROOT}/ca.crt \
  -k \
  -d '{
    "domain_id": "alice",
    "domaindata_id": "comprehensive-type-test",
    "name": "全面类型测试表",
    "type": "table",
    "datasource_id": "kuscia-ds-01",
    "relative_uri": "COMPREHENSIVE_TYPE_TEST",
    "attributes": {},
    "columns": [
      {
        "name": "COL_TINYINT",
        "type": "int8",
        "comment": "8位有符号整数(TINYINT → Int(8) → int8)"
      },
      {
        "name": "COL_SMALLINT",
        "type": "int16",
        "comment": "16位有符号整数(SMALLINT → Int(16) → int16)"
      },
      {
        "name": "COL_INT",
        "type": "int32",
        "comment": "32位有符号整数(INT → Int(32) → int32)"
      },
      {
        "name": "COL_BIGINT",
        "type": "int64",
        "comment": "64位有符号整数(BIGINT → Int(64) → int64)"
      },
      {
        "name": "COL_FLOAT",
        "type": "float32",
        "comment": "单精度浮点数(FLOAT → FloatingPoint(SINGLE) → float32)"
      },
      {
        "name": "COL_DOUBLE",
        "type": "float64",
        "comment": "双精度浮点数(DOUBLE → FloatingPoint(DOUBLE) → float64)"
      },
      {
        "name": "COL_CHAR",
        "type": "string",
        "comment": "定长字符串(CHAR → Utf8 → string)"
      },
      {
        "name": "COL_VARCHAR",
        "type": "string",
        "comment": "变长字符串(VARCHAR → Utf8 → string)"
      },
      {
        "name": "COL_VARCHAR2",
        "type": "string",
        "comment": "VARCHAR2类型(VARCHAR2 → Utf8 → string)"
      },
      {
        "name": "COL_CLOB",
        "type": "large_string",
        "comment": "大文本(CLOB → LargeUtf8 → large_string)"
      },
      {
        "name": "COL_DECIMAL",
        "type": "decimal",
        "comment": "高精度数值(38,2)(DECIMAL → Decimal(38,2) → float64,精度可能丢失)"
      },
      {
        "name": "COL_NUMERIC",
        "type": "float64",
        "comment": "数值类型(18,4)(NUMERIC → Decimal(18,4) → float64,精度可能丢失)"
      },
      {
        "name": "COL_NUMBER",
        "type": "float64",
        "comment": "NUMBER类型(10,0)(NUMBER → Decimal(10,0) → float64,精度可能丢失)"
      },
      {
        "name": "COL_BIT",
        "type": "bool",
        "comment": "布尔值(BIT → Bool → bool)"
      },
      {
        "name": "COL_BOOLEAN",
        "type": "bool",
        "comment": "布尔类型(BOOLEAN → Bool → bool)"
      },
      {
        "name": "COL_DATE",
        "type": "date32",
        "comment": "日期(DATE → Date(DAY) → date32)"
      },
      {
        "name": "COL_TIME",
        "type": "time32",
        "comment": "时间(TIME → Time(MILLISECOND) → time32)"
      },
      {
        "name": "COL_TIMESTAMP",
        "type": "timestamp_us",
        "comment": "时间戳(TIMESTAMP → Timestamp(MICROSECOND) → timestamp_us)"
      },
      {
        "name": "COL_DATETIME",
        "type": "timestamp_us",
        "comment": "日期时间(DATETIME → Timestamp(MICROSECOND) → timestamp_us)"
      },
      {
        "name": "COL_BINARY",
        "type": "binary",
        "comment": "定长二进制(BINARY → Binary → binary)"
      },
      {
        "name": "COL_VARBINARY",
        "type": "binary",
        "comment": "变长二进制(VARBINARY → Binary → binary)"
      },
      {
        "name": "COL_BLOB",
        "type": "large_binary",
        "comment": "大二进制对象(BLOB → LargeBinary → large_binary)"
      },
      {
        "name": "COL_INTERVAL_YM",
        "type": "interval_year_month",
        "comment": "年月间隔(INTERVAL YEAR TO MONTH → Interval(YEAR_MONTH) → interval_year_month)"
      },
      {
        "name": "COL_INTERVAL_DT",
        "type": "interval_day_time",
        "comment": "日时间隔(INTERVAL DAY TO SECOND → Interval(DAY_TIME) → interval_day_time)"
      }
    ]
  }'
export CTR_CERTS_ROOT=/home/kuscia/var/certs

curl -X POST 'https://localhost:8082/api/v1/domaindata/create' \
  --header "Token: $(cat ${CTR_CERTS_ROOT}/token)" \
  --header 'Content-Type: application/json' \
  --cert ${CTR_CERTS_ROOT}/kusciaapi-server.crt \
  --key ${CTR_CERTS_ROOT}/kusciaapi-server.key \
  --cacert ${CTR_CERTS_ROOT}/ca.crt \
  -k \
  -d '{
    "domain_id": "alice",
    "domaindata_id": "comprehensive-type-test1",
    "name": "全面类型测试表",
    "type": "table",
    "datasource_id": "kuscia-ds-01",
    "relative_uri": "COMPREHENSIVE_TYPE_TEST1",
    "attributes": {},
    "columns": [
      {
        "name": "COL_TINYINT",
        "type": "int8",
        "comment": "8位有符号整数(TINYINT → Int(8) → int8)"
      },
      {
        "name": "COL_SMALLINT",
        "type": "int16",
        "comment": "16位有符号整数(SMALLINT → Int(16) → int16)"
      },
      {
        "name": "COL_INT",
        "type": "int32",
        "comment": "32位有符号整数(INT → Int(32) → int32)"
      },
      {
        "name": "COL_BIGINT",
        "type": "int64",
        "comment": "64位有符号整数(BIGINT → Int(64) → int64)"
      },
      {
        "name": "COL_FLOAT",
        "type": "float32",
        "comment": "单精度浮点数(FLOAT → FloatingPoint(SINGLE) → float32)"
      },
      {
        "name": "COL_DOUBLE",
        "type": "float64",
        "comment": "双精度浮点数(DOUBLE → FloatingPoint(DOUBLE) → float64)"
      },
      {
        "name": "COL_CHAR",
        "type": "string",
        "comment": "定长字符串(CHAR → Utf8 → string)"
      },
      {
        "name": "COL_VARCHAR",
        "type": "string",
        "comment": "变长字符串(VARCHAR → Utf8 → string)"
      },
      {
        "name": "COL_VARCHAR2",
        "type": "string",
        "comment": "VARCHAR2类型(VARCHAR2 → Utf8 → string)"
      },
      {
        "name": "COL_CLOB",
        "type": "large_string",
        "comment": "大文本(CLOB → LargeUtf8 → large_string)"
      },
      {
        "name": "COL_DECIMAL",
        "type": "decimal",
        "comment": "高精度数值(38,2)(DECIMAL → Decimal(38,2) → float64,精度可能丢失)"
      },
      {
        "name": "COL_NUMERIC",
        "type": "float64",
        "comment": "数值类型(18,4)(NUMERIC → Decimal(18,4) → float64,精度可能丢失)"
      },
      {
        "name": "COL_NUMBER",
        "type": "float64",
        "comment": "NUMBER类型(10,0)(NUMBER → Decimal(10,0) → float64,精度可能丢失)"
      },
      {
        "name": "COL_BIT",
        "type": "bool",
        "comment": "布尔值(BIT → Bool → bool)"
      },
      {
        "name": "COL_BOOLEAN",
        "type": "bool",
        "comment": "布尔类型(BOOLEAN → Bool → bool)"
      },
      {
        "name": "COL_DATE",
        "type": "date32",
        "comment": "日期(DATE → Date(DAY) → date32)"
      },
      {
        "name": "COL_TIME",
        "type": "time32",
        "comment": "时间(TIME → Time(MILLISECOND) → time32)"
      },
      {
        "name": "COL_TIMESTAMP",
        "type": "timestamp_us",
        "comment": "时间戳(TIMESTAMP → Timestamp(MICROSECOND) → timestamp_us)"
      },
      {
        "name": "COL_DATETIME",
        "type": "timestamp_us",
        "comment": "日期时间(DATETIME → Timestamp(MICROSECOND) → timestamp_us)"
      },
      {
        "name": "COL_BINARY",
        "type": "binary",
        "comment": "定长二进制(BINARY → Binary → binary)"
      },
      {
        "name": "COL_VARBINARY",
        "type": "binary",
        "comment": "变长二进制(VARBINARY → Binary → binary)"
      },
      {
        "name": "COL_BLOB",
        "type": "large_binary",
        "comment": "大二进制对象(BLOB → LargeBinary → large_binary)"
      },
      {
        "name": "COL_INTERVAL_YM",
        "type": "interval_year_month",
        "comment": "年月间隔(INTERVAL YEAR TO MONTH → Interval(YEAR_MONTH) → interval_year_month)"
      },
      {
        "name": "COL_INTERVAL_DT",
        "type": "interval_day_time",
        "comment": "日时间隔(INTERVAL DAY TO SECOND → Interval(DAY_TIME) → interval_day_time)"
      }
    ]
  }'

四、Kuscia与DataProxy、达梦数据库集成测试

4.1 dataproxy代码中集成测试

dataproxy-integration-tests/src/test/java/org/secretflow/dataproxy/integration/tests/DamengIntegrationTest.java中,我有编写达梦插件的集成测试代码,可以与达梦数据库容器中达梦数据库实际交互完成集成测试。

目前达梦官方不提供数据库docker容器了,然后我找到了一个之前达梦官方开放出来的达梦数据库容器,并在其上进行修改(增加了一个数据库大小写不敏感的数据库实例)。然后使用 Testcontainers 启动我自制的达梦数据库 Docker 容器。

GenericContainer<?> container = new GenericContainer<>(DockerImageName.parse("kongxr7/dameng:8.1"))
                        .withExposedPorts(DOCKER_CONTAINER_PORT, 5237)
                        .withPrivilegedMode(true)
                        .withEnv("PAGE_SIZE", "16")
                        .withEnv("LD_LIBRARY_PATH", "/opt/dmdbms/bin")
                        .withEnv("INSTANCE_NAME", "dm8db")
                        .withStartupTimeout(Duration.ofSeconds(600))
                        .waitingFor(Wait.forListeningPort());

测试结果: image-20251209234105972

4.2 使用Flight 客户端工具测试

我写了一个dameng_client 测试客户端(达梦数据库的 Flight 客户端工具)。用于测试与 Kuscia DataProxy 交互,实现数据的读取、写入和复制操作。我将这个测试客户端放在alice节点内。

/tmp/dameng_client copy comprehensive-type-test comprehensive-type-test1

就可以使用 dataproxy 向 comprehensive-type-test 指向的达梦数据库读取数据,并解析成 arrow record 打印出来。然后再将读出的 arrow record 通过 dataproxy 写入到 comprehensive-type-test1 指向的 达梦数据库。从而实现 arrow与达梦数据库之间的双向读写测试。

测试结果:

bash-5.2# /tmp/dameng_client copy comprehensive-type-test comprehensive-type-test1
正在从 comprehensive-type-test 复制数据到 comprehensive-type-test1

=== 第一步:读取源数据 ===
正在获取源 FlightInfo...
源 DataProxy URI: grpc+tcp://10.88.0.2:8023
正在从源 DataProxy 读取数据...
Schema: schema:
  fields: 24
    - COL_TINYINT: type=int8, nullable
    - COL_SMALLINT: type=int16, nullable
    - COL_INT: type=int32, nullable
    - COL_BIGINT: type=int64, nullable
    - COL_FLOAT: type=float32, nullable
    - COL_DOUBLE: type=float64, nullable
    - COL_CHAR: type=utf8, nullable
    - COL_VARCHAR: type=utf8, nullable
    - COL_VARCHAR2: type=utf8, nullable
    - COL_CLOB: type=large_utf8, nullable
    - COL_DECIMAL: type=decimal(38, 10), nullable
    - COL_NUMERIC: type=decimal(18, 4), nullable
    - COL_NUMBER: type=decimal(10, 0), nullable
    - COL_BIT: type=bool, nullable
    - COL_BOOLEAN: type=bool, nullable
    - COL_DATE: type=date32, nullable
    - COL_TIME: type=time32[ms], nullable
    - COL_TIMESTAMP: type=timestamp[us], nullable
    - COL_DATETIME: type=timestamp[us], nullable
    - COL_BINARY: type=binary, nullable
    - COL_VARBINARY: type=binary, nullable
    - COL_BLOB: type=large_binary, nullable
    - COL_INTERVAL_YM: type=month_interval, nullable
    - COL_INTERVAL_DT: type=day_time_interval, nullable

--- 读取到的 Record #1 ---
行数: 3, 列数: 24

  === 从源读取的 Record #1 数据详情 ===
  第 1 行:
    COL_TINYINT: 127 (type: *array.Int8)
    COL_SMALLINT: -32768 (type: *array.Int16)
    COL_INT: 2147483647 (type: *array.Int32)
    COL_BIGINT: 9223372036854775807 (type: *array.Int64)
    COL_FLOAT: 3.141590 (type: *array.Float32)
    COL_DOUBLE: 2.718282 (type: *array.Float64)
    COL_CHAR: Hello World                                        (type: *array.String)
    COL_VARCHAR: This is a VARCHAR string (type: *array.String)
    COL_VARCHAR2: This is a VARCHAR2 string (type: *array.String)
    COL_CLOB: This is a CLOB field with very long content. AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA (type: *array.LargeString)
    COL_DECIMAL: 12345678901234567890.1200000000 (type: *array.Decimal128)
    COL_NUMERIC: 12345678901234.5678 (type: *array.Decimal128)
    COL_NUMBER: 9876543210 (type: *array.Decimal128)
    COL_BIT: true (type: *array.Boolean)
    COL_BOOLEAN: true (type: *array.Boolean)
    COL_DATE: 2024-01-15 (type: *array.Date32)
    COL_TIME: 14:30:45.000 (type: *array.Time32)
    COL_TIMESTAMP: 2024-01-15T06:30:45Z (type: *array.Timestamp)
    COL_DATETIME: 2024-01-15T06:30:45Z (type: *array.Timestamp)
    COL_BINARY: 48656C6C6F0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 (type: *array.Binary)
    COL_VARBINARY: 576F726C64 (type: *array.Binary)
    COL_BLOB: 48656C6C6F20576F726C6421 (type: *array.LargeBinary)
    COL_INTERVAL_YM: 2-3 (type: *array.MonthInterval)
    COL_INTERVAL_DT: 1 days 45045123 ms (type: *array.DayTimeInterval)
  第 2 行:
    COL_TINYINT: NULL (type: *array.Int8)
    COL_SMALLINT: NULL (type: *array.Int16)
    COL_INT: NULL (type: *array.Int32)
    COL_BIGINT: NULL (type: *array.Int64)
    COL_FLOAT: NULL (type: *array.Float32)
    COL_DOUBLE: NULL (type: *array.Float64)
    COL_CHAR: NULL (type: *array.String)
    COL_VARCHAR: NULL (type: *array.String)
    COL_VARCHAR2: NULL (type: *array.String)
    COL_CLOB: NULL (type: *array.LargeString)
    COL_DECIMAL: NULL (type: *array.Decimal128)
    COL_NUMERIC: NULL (type: *array.Decimal128)
    COL_NUMBER: NULL (type: *array.Decimal128)
    COL_BIT: NULL (type: *array.Boolean)
    COL_BOOLEAN: NULL (type: *array.Boolean)
    COL_DATE: NULL (type: *array.Date32)
    COL_TIME: NULL (type: *array.Time32)
    COL_TIMESTAMP: NULL (type: *array.Timestamp)
    COL_DATETIME: NULL (type: *array.Timestamp)
    COL_BINARY: NULL (type: *array.Binary)
    COL_VARBINARY: NULL (type: *array.Binary)
    COL_BLOB: NULL (type: *array.LargeBinary)
    COL_INTERVAL_YM: NULL (type: *array.MonthInterval)
    COL_INTERVAL_DT: NULL (type: *array.DayTimeInterval)
  第 3 行:
    COL_TINYINT: -128 (type: *array.Int8)
    COL_SMALLINT: 32767 (type: *array.Int16)
    COL_INT: -2147483648 (type: *array.Int32)
    COL_BIGINT: -9223372036854775808 (type: *array.Int64)
    COL_FLOAT: -3.141590 (type: *array.Float32)
    COL_DOUBLE: -2.718282 (type: *array.Float64)
    COL_CHAR: A                                                  (type: *array.String)
    COL_VARCHAR: Short (type: *array.String)
    COL_VARCHAR2: Short2 (type: *array.String)
    COL_CLOB: Short CLOB (type: *array.LargeString)
    COL_DECIMAL: 0.0100000000 (type: *array.Decimal128)
    COL_NUMERIC: 0.0001 (type: *array.Decimal128)
    COL_NUMBER: 0 (type: *array.Decimal128)
    COL_BIT: false (type: *array.Boolean)
    COL_BOOLEAN: false (type: *array.Boolean)
    COL_DATE: 1970-01-01 (type: *array.Date32)
    COL_TIME: 00:00:00.000 (type: *array.Time32)
    COL_TIMESTAMP: 1969-12-31T16:00:00Z (type: *array.Timestamp)
    COL_DATETIME: 1969-12-31T16:00:00Z (type: *array.Timestamp)
    COL_BINARY: 00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 (type: *array.Binary)
    COL_VARBINARY: FF (type: *array.Binary)
    COL_BLOB: 000102030405 (type: *array.LargeBinary)
    COL_INTERVAL_YM: 0-0 (type: *array.MonthInterval)
    COL_INTERVAL_DT: 0 days 0 ms (type: *array.DayTimeInterval)
  ========================

成功读取 1 个 Record,共 3 行数据

=== 第二步:写入目标数据 ===
正在获取目标 FlightInfo...
目标 DataProxy URI: grpc+tcp://10.88.0.2:8023
正在向目标 DataProxy 写入数据...

--- 写入 Record #1 ---
正在写入 Record 1/1 (3 行)...

  === 即将写入的 Record #1 数据详情 ===
  第 1 行:
    COL_TINYINT: 127 (type: *array.Int8)
    COL_SMALLINT: -32768 (type: *array.Int16)
    COL_INT: 2147483647 (type: *array.Int32)
    COL_BIGINT: 9223372036854775807 (type: *array.Int64)
    COL_FLOAT: 3.141590 (type: *array.Float32)
    COL_DOUBLE: 2.718282 (type: *array.Float64)
    COL_CHAR: Hello World                                        (type: *array.String)
    COL_VARCHAR: This is a VARCHAR string (type: *array.String)
    COL_VARCHAR2: This is a VARCHAR2 string (type: *array.String)
    COL_CLOB: This is a CLOB field with very long content. AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA (type: *array.LargeString)
    COL_DECIMAL: 12345678901234567890.1200000000 (type: *array.Decimal128)
    COL_NUMERIC: 12345678901234.5678 (type: *array.Decimal128)
    COL_NUMBER: 9876543210 (type: *array.Decimal128)
    COL_BIT: true (type: *array.Boolean)
    COL_BOOLEAN: true (type: *array.Boolean)
    COL_DATE: 2024-01-15 (type: *array.Date32)
    COL_TIME: 14:30:45.000 (type: *array.Time32)
    COL_TIMESTAMP: 2024-01-15T06:30:45Z (type: *array.Timestamp)
    COL_DATETIME: 2024-01-15T06:30:45Z (type: *array.Timestamp)
    COL_BINARY: 48656C6C6F0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 (type: *array.Binary)
    COL_VARBINARY: 576F726C64 (type: *array.Binary)
    COL_BLOB: 48656C6C6F20576F726C6421 (type: *array.LargeBinary)
    COL_INTERVAL_YM: 2-3 (type: *array.MonthInterval)
    COL_INTERVAL_DT: 1 days 45045123 ms (type: *array.DayTimeInterval)
  第 2 行:
    COL_TINYINT: NULL (type: *array.Int8)
    COL_SMALLINT: NULL (type: *array.Int16)
    COL_INT: NULL (type: *array.Int32)
    COL_BIGINT: NULL (type: *array.Int64)
    COL_FLOAT: NULL (type: *array.Float32)
    COL_DOUBLE: NULL (type: *array.Float64)
    COL_CHAR: NULL (type: *array.String)
    COL_VARCHAR: NULL (type: *array.String)
    COL_VARCHAR2: NULL (type: *array.String)
    COL_CLOB: NULL (type: *array.LargeString)
    COL_DECIMAL: NULL (type: *array.Decimal128)
    COL_NUMERIC: NULL (type: *array.Decimal128)
    COL_NUMBER: NULL (type: *array.Decimal128)
    COL_BIT: NULL (type: *array.Boolean)
    COL_BOOLEAN: NULL (type: *array.Boolean)
    COL_DATE: NULL (type: *array.Date32)
    COL_TIME: NULL (type: *array.Time32)
    COL_TIMESTAMP: NULL (type: *array.Timestamp)
    COL_DATETIME: NULL (type: *array.Timestamp)
    COL_BINARY: NULL (type: *array.Binary)
    COL_VARBINARY: NULL (type: *array.Binary)
    COL_BLOB: NULL (type: *array.LargeBinary)
    COL_INTERVAL_YM: NULL (type: *array.MonthInterval)
    COL_INTERVAL_DT: NULL (type: *array.DayTimeInterval)
  第 3 行:
    COL_TINYINT: -128 (type: *array.Int8)
    COL_SMALLINT: 32767 (type: *array.Int16)
    COL_INT: -2147483648 (type: *array.Int32)
    COL_BIGINT: -9223372036854775808 (type: *array.Int64)
    COL_FLOAT: -3.141590 (type: *array.Float32)
    COL_DOUBLE: -2.718282 (type: *array.Float64)
    COL_CHAR: A                                                  (type: *array.String)
    COL_VARCHAR: Short (type: *array.String)
    COL_VARCHAR2: Short2 (type: *array.String)
    COL_CLOB: Short CLOB (type: *array.LargeString)
    COL_DECIMAL: 0.0100000000 (type: *array.Decimal128)
    COL_NUMERIC: 0.0001 (type: *array.Decimal128)
    COL_NUMBER: 0 (type: *array.Decimal128)
    COL_BIT: false (type: *array.Boolean)
    COL_BOOLEAN: false (type: *array.Boolean)
    COL_DATE: 1970-01-01 (type: *array.Date32)
    COL_TIME: 00:00:00.000 (type: *array.Time32)
    COL_TIMESTAMP: 1969-12-31T16:00:00Z (type: *array.Timestamp)
    COL_DATETIME: 1969-12-31T16:00:00Z (type: *array.Timestamp)
    COL_BINARY: 00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 (type: *array.Binary)
    COL_VARBINARY: FF (type: *array.Binary)
    COL_BLOB: 000102030405 (type: *array.LargeBinary)
    COL_INTERVAL_YM: 0-0 (type: *array.MonthInterval)
    COL_INTERVAL_DT: 0 days 0 ms (type: *array.DayTimeInterval)
  ========================

Record #1 写入成功
目标服务器已关闭流,写入完成。

成功复制 3 行数据从 comprehensive-type-test 到 comprehensive-type-test1

kongxiaoran avatar Dec 09 '25 15:12 kongxiaoran