新增数据源IO 实现:达梦
此 ISSUE 为 [隐语开源共建计划(SecretFlow Open Source Contribution Plan,简称 SF OSCP)Phase 6 任务 ISSUE,欢迎社区开发者参与共建~
- 认领前,辛苦确认是否完成[报名](https://studio.secretflow.com/activity/rkub4eryy7g3vmn/detail
- 更多任务,可查看 「OSCP Phase6 Season of Dev」Project
This ISSUE is one of the tasks of the [SecretFlow Open Source Contribution Plan (referred to as SF OSCP) Phase 6. Welcome to join us in building it together!
- Before claiming a task, please make sure you have signed up.
- For more tasks, you can check the "OSCP Phase6 Season of Dev" Project.
任务介绍
- 任务名称:新增数据源IO 实现:达梦
- 技术方向:DataProxy
- 任务难度:挑战🌟🌟🌟
- 任务预估完成时间:5周
- 任务 Reviewer:@YanZhuangz
详细要求
目前 Kusica 支持的数据源类型已包含 localfs、OSS、MySQL、ODPS、PostgreSQL,在实际应用中,会有基于 达梦数据库的数据来源,为了避免额外的数据操作,添加 达梦数据库数据源的连接接入,丰富 Kuscia 的数据源生态。
- 要求:
- 正确处理数据类型的映射,确保数据的正确读取与写入;
- 正确处理 NULL 值(读、写);
- 数据写入时需保证写入的顺序,避免出现写入乱序;
- 数据写入时进行覆盖写(分区表的话对分区进行覆盖写),保证任务执行时写入数据都是最后一次任务写入的数据;
- 数据读取时需保证读取顺序是和写入一致,某些任务写入数据时已经进行了排序。
- 任务说明:
- 设计方案:请先准备你的设计思路方案,并在 ISSUE 下评论提交方案,经过 Reviewer 确认设计思路方案后再根据确认后的方案,进行相关的功能开发。初步了解 达梦数据库官方支持 JDBC,更适合使用 Java 语言开发。
- 功能开发:代码需符合代码规范,参考: https://github.com/spring-projects/spring-framework/wiki/Code-Style
- 功能提交:功能开发完成后,关联该 ISSUE 并提交代码至https://github.com/secretflow/kuscia、https://github.com/secretflow/dataproxy 如果涉及到两个仓库提交,需要在 Kuscia 仓库关联此 ISSUE,会同步阅读两个仓库的 PR。
能力要求
- 对 Kuscia datamesh 原理有一定了解
- 对 Apache Arrow 有一定了解
- 熟悉 Java 语言
操作说明
- 开发参考:https://www.secretflow.org.cn/zh-CN/docs/kuscia/v0.15.0b0/development/add_datamesh_io
- 对指导文档不理解或者想要进行补充,可在此 ISSUE 下进行评论,也可提交此文档的修改
- JDBC 参考:https://eco.dameng.com/document/dm/zh-cn/pm/jdbc-rogramming-guide.html
kongxiaoran Give it to me
恭喜成功🏅认领 @kongxiaoran
Kuscia 接入达梦数据源 设计方案
1. 概述
1.1. 背景
Kuscia目前已支持 MySQL、PostgreSQL 等常用数据库,但缺少对达梦数据库的直接支持。本次issue需要为 DataProxy 开发一个新的数据源插件 dataproxy-plugin-dameng,实现对达梦数据库的高效、可靠读写,打通上层计算应用与达梦数据库之间的数据链路。
1.2. 目标
- 开发一个独立的、可插拔的达梦数据源插件。
- 实现 DataProxy 与达梦数据库之间基于 Apache Arrow 格式的数据双向流动。
- 确保数据类型映射的准确性,特别是数值、日期和文本类型。
- 正确处理 NULL 值(读、写)
- 数据写入时进行覆盖写(分区表的话对分区进行覆盖写),保证任务执行时写入数据都是最后一次任务写入的数据;
- 保证读写过程中的数据顺序一致性。
- 提供完整的单元测试和集成测试,确保插件的稳定性和可靠性。
2.总体设计
2.1技术选型
进行过前期调研,确认达梦官方支持 Java JDBC,且从生态上来说选择使用Java对接达梦优于Go。
了解过DataProxy 现有的插件体系后,计划利用达梦官方提供的 JDBC 驱动 (DmJdbcDriver18.jar) 来与数据库进行交互。这种方式与现有的 dataproxy-plugin-database 模块实现原理一致,可以最大程度地复用现有抽象和逻辑,降低开发和维护成本。
2.2 架构设计
将创建一个新的 Maven 模块:dataproxy-plugin-dameng。该插件将实现 DataProxy 的核心 SPI 接口,并负责将 DataProxy 的内部数据格式(Apache Arrow)与 JDBC 的 ResultSet/PreparedStatement 进行双向转换。
3.具体设计
3.1新增内容
-
DamengFlightProducer (入口类)
-
DamengUtil (工具类) - 6个核心函数。
| 函数 | 职责 |
|---|---|
initDameng() |
JDBC连接初始化 |
buildQuerySql() |
SELECT语句构建 |
jdbcType2ArrowType() |
JDBC→Arrow类型转换 |
buildCreateTableSql() |
CREATE TABLE语句 |
buildMultiRowInsertSql() |
批量INSERT语句 |
checkTableExists() |
表存在性检查 |
这块实现可以学习Hive插件的实现
3.2 关键要点
实现内容中,很重要的一点就是正确处理数据类型映射!
基于[达梦官方 JDBC 编程指南](https://eco.dameng.com/document/dm/zh-cn/pm/jdbc-rogramming-guide.html#4.4.1%20%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%E6%89%A9%E5%B1%95)和 Apache Arrow 类型系统的精确映射:
| 达梦类型 | 达梦编号 | JDBC类型 | Arrow TypeID | Arrow具体类型 | 说明 |
|---|---|---|---|---|---|
| 基础数值类型 | |||||
| TINYINT | 5 | TINYINT(-6) | Int(2) | ArrowType.Int(8, true) | 8位有符号整数 |
| SMALLINT | 6 | SMALLINT(5) | Int(2) | ArrowType.Int(16, true) | 16位有符号整数 |
| INT | 7 | INTEGER(4) | Int(2) | ArrowType.Int(32, true) | 32位有符号整数 |
| BIGINT | 8 | BIGINT(-5) | Int(2) | ArrowType.Int(64, true) | 64位有符号整数 |
| DECIMAL | 9 | DECIMAL(3) | Decimal(7) | ArrowType.Decimal(p,s,128) | 定点数,可变精度 |
| REAL | 10 | REAL(7) | FloatingPoint(3) | ArrowType.FloatingPoint(SINGLE) | 32位浮点数 |
| DOUBLE | 11 | DOUBLE(8) | FloatingPoint(3) | ArrowType.FloatingPoint(DOUBLE) | 64位浮点数 |
| 字符串类型 | |||||
| CHAR | 0 | CHAR(1) | Utf8(5) | ArrowType.Utf8 | 定长字符串 |
| VARCHAR | 2 | VARCHAR(12) | Utf8(5) | ArrowType.Utf8 | 变长字符串 |
| VARCHAR2 | 1 | VARCHAR(12) | Utf8(5) | ArrowType.Utf8 | Oracle兼容字符串 |
| CLOB | 19 | CLOB(2005) | LargeUtf8(20) | ArrowType.LargeUtf8 | 大字符串对象 |
| 日期时间类型 | |||||
| DATE | 14 | DATE(91) | Date(8) | ArrowType.Date(DateUnit.DAY) | 日期类型 |
| TIME | 15 | TIME(92) | Time(9) | ArrowType.Time(TimeUnit.MILLISECOND, 32) | 时间类型 |
| DATETIME | 16 | TIMESTAMP(93) | Timestamp(10) | ArrowType.Timestamp(TimeUnit.MILLISECOND, null) | 时间戳 |
| DATETIME2 | 26 | TIMESTAMP(93) | Timestamp(10) | ArrowType.Timestamp(TimeUnit.NANOSECOND, null) | 纳秒精度时间戳 |
| TIME_TZ | 22 | TIME_WITH_TIMEZONE(2013) | Time(9) | ArrowType.Time(TimeUnit.NANOSECOND, 64) | 带时区时间 |
| DATETIME_TZ | 23 | TIMESTAMP_WITH_TIMEZONE(2014) | Timestamp(10) | ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC") | 带时区时间戳 |
| DATETIME2_TZ | 27 | TIMESTAMP_WITH_TIMEZONE(2014) | Timestamp(10) | ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC") | 纳秒精度带时区时间戳 |
| 二进制类型 | |||||
| BIT | 3 | BIT(-7) | Binary(4) | ArrowType.Binary | 位串 |
| BINARY | 17 | BINARY(-2) | FixedSizeBinary(15) | ArrowType.FixedSizeBinary(n) | 定长二进制 |
| VARBINARY | 18 | VARBINARY(-3) | Binary(4) | ArrowType.Binary | 变长二进制 |
| BLOB | 12 | BLOB(2004) | LargeBinary(19) | ArrowType.LargeBinary | 大二进制对象 |
| 布尔类型 | |||||
| BOOLEAN | 13 | BOOLEAN(16) | Bool(6) | ArrowType.Bool.INSTANCE |
布尔类型 |
| 达梦特有类型 | |||||
| INTERVAL_YM | 20 | OTHER(1111) | Interval(11) | ArrowType.Interval(IntervalUnit.YEAR_MONTH) | 年-月间隔 |
| INTERVAL_DT | 21 | OTHER(1111) | Interval(11) | ArrowType.Interval(IntervalUnit.DAY_TIME) | 日-时间隔 |
| ROWID | 28 | ROWID(-8) | Utf8(5) | ArrowType.Utf8 | 行标识符(转为字符串) |
| NULL | 29 | NULL(0) | Null(1) | ArrowType.Null | 空值类型 |
如果有不准确的地方,后面再实际调整
4.测试
单元测试
DamengUtilTest: 6个工具函数的单元测试- 重点测试: 类型转换、SQL构建
集成测试
- 真实达梦数据库环境测试
- 数据量测试: 1万 → 100万 → 1000万行
- 数据类型完整性测试
- 达梦特有类型测试: INTERVAL_YM, INTERVAL_DT, DATETIME2
- 分区表读写测试
@YanZhuangz 您好 Reviewer,麻烦确认设计思路方案是否可以。
我发现在进行jdbc类型转化为arrow类型时,有些数据类型如果可以获取到长度、精度信息,可以更好、合理的映射到 arrow 上。比如 DECIMAL、TIMESTAMP以及BINARY等。
那么是否要修改框架代码支持传递 COLUMN_SIZE 和 DECIMAL_DIGITS 信息:
ArrowType arrowType = this.jdbcType2ArrowType.apply(columnType, columnSize, decimalDigits);,再做精细化处理。
还是说不改动框架代码,先简单实现即可。
我发现在进行jdbc类型转化为arrow类型时,有些数据类型如果可以获取到长度、精度信息,可以更好、合理的映射到 arrow 上。比如 DECIMAL、TIMESTAMP以及BINARY等。
那么是否要修改框架代码支持传递
COLUMN_SIZE和DECIMAL_DIGITS信息:ArrowType arrowType = this.jdbcType2ArrowType.apply(columnType, columnSize, decimalDigits);,再做精细化处理。还是说不改动框架代码,先简单实现即可。
关于达梦和 Arrow 的字段映射的问题,目前可以参考现有 dataproxy 中的 hive,odps 等的实现,完成现有字段的支持。先不考虑修改框架代码。 同时需要注意的是还有,新增行单测覆盖率要求达到70%,分区表支持
arrow 的 large-utf8,date(多种)
Kuscia支持达梦数据库测试记录
一、准备模拟数据
-
创建schema
create user KUSCIA identified by 1234567890; -
创建测试表
-- 删除已存在的表(如果存在) DROP TABLE IF EXISTS COMPREHENSIVE_TYPE_TEST; -- 创建测试表 CREATE TABLE COMPREHENSIVE_TYPE_TEST ( -- 整数类型 COL_TINYINT TINYINT, -- 8位有符号整数 COL_SMALLINT SMALLINT, -- 16位有符号整数 COL_INT INT, -- 32位有符号整数 COL_BIGINT BIGINT, -- 64位有符号整数 -- 浮点数类型 COL_FLOAT FLOAT, -- 单精度浮点数 COL_DOUBLE DOUBLE, -- 双精度浮点数 -- 字符串类型 COL_CHAR CHAR(50), -- 定长字符串 COL_VARCHAR VARCHAR(200), -- 变长字符串 COL_VARCHAR2 VARCHAR2(200), -- VARCHAR2(达梦特有) COL_CLOB CLOB, -- 大文本(LargeUtf8) -- 数值类型 COL_DECIMAL DECIMAL(38, 2), -- 高精度数值(precision=38, scale=2) COL_NUMERIC NUMERIC(18, 4), -- 数值类型(precision=18, scale=4) COL_NUMBER NUMBER(10, 0), -- NUMBER类型(整数) -- 布尔类型 COL_BIT BIT, -- 布尔值(1=真,0=假) COL_BOOLEAN BIT, -- 布尔类型(达梦使用BIT表示布尔值) -- 日期时间类型 COL_DATE DATE, -- 日期 COL_TIME TIME, -- 时间 COL_TIMESTAMP TIMESTAMP, -- 时间戳(微秒精度) COL_DATETIME DATETIME, -- 日期时间 -- 二进制类型 COL_BINARY BINARY(100), -- 定长二进制 COL_VARBINARY VARBINARY(500), -- 变长二进制 COL_BLOB BLOB, -- 大二进制对象(LargeBinary) -- 时间间隔类型 COL_INTERVAL_YM INTERVAL YEAR TO MONTH, -- 年月间隔 COL_INTERVAL_DT INTERVAL DAY TO SECOND -- 日时间隔 ); -- 插入测试数据 INSERT INTO COMPREHENSIVE_TYPE_TEST VALUES ( -- 整数类型 127, -- COL_TINYINT: 最大值 -32768, -- COL_SMALLINT: 最小值 2147483647, -- COL_INT: 最大值 9223372036854775807, -- COL_BIGINT: 最大值 -- 浮点数类型 3.14159, -- COL_FLOAT: 圆周率 2.718281828459045, -- COL_DOUBLE: 自然常数 -- 字符串类型 'Hello World', -- COL_CHAR: 固定长度字符串 'This is a VARCHAR string', -- COL_VARCHAR: 变长字符串 'This is a VARCHAR2 string', -- COL_VARCHAR2: VARCHAR2类型 'This is a CLOB field with very long content. ' || RPAD('A', 1000, 'A'), -- COL_CLOB: 大文本(1000+字符,使用RPAD生成) -- 数值类型 123456789012345678901234567890.12, -- COL_DECIMAL: 大数值(scale=2) 12345678901234.5678, -- COL_NUMERIC: 数值(scale=4) 9876543210, -- COL_NUMBER: 整数 -- 布尔类型 1, -- COL_BIT: 真 1, -- COL_BOOLEAN: 真(BIT类型,1=真) -- 日期时间类型 TO_DATE('2024-01-15', 'YYYY-MM-DD'), -- COL_DATE: 日期 CAST('14:30:45' AS TIME), -- COL_TIME: 时间 TO_TIMESTAMP('2024-01-15 14:30:45.123456', 'YYYY-MM-DD HH24:MI:SS.FF6'), -- COL_TIMESTAMP: 时间戳(微秒) TO_TIMESTAMP('2024-01-15 14:30:45', 'YYYY-MM-DD HH24:MI:SS'), -- COL_DATETIME: 日期时间 -- 二进制类型 HEXTORAW('48656C6C6F'), -- COL_BINARY: "Hello" 的十六进制 HEXTORAW('576F726C64'), -- COL_VARBINARY: "World" 的十六进制 HEXTORAW('48656C6C6F20576F726C6421'), -- COL_BLOB: "Hello World!" 的十六进制 -- 时间间隔类型 INTERVAL '2-3' YEAR TO MONTH, -- COL_INTERVAL_YM: 2年3个月 INTERVAL '1 12:30:45.123456' DAY(1) TO SECOND(6) -- COL_INTERVAL_DT: 1天12小时30分45.123456秒 ); -- 插入第二条测试数据(包含NULL值) INSERT INTO COMPREHENSIVE_TYPE_TEST VALUES ( NULL, -- COL_TINYINT: NULL NULL, -- COL_SMALLINT: NULL NULL, -- COL_INT: NULL NULL, -- COL_BIGINT: NULL NULL, -- COL_FLOAT: NULL NULL, -- COL_DOUBLE: NULL NULL, -- COL_CHAR: NULL NULL, -- COL_VARCHAR: NULL NULL, -- COL_VARCHAR2: NULL NULL, -- COL_CLOB: NULL NULL, -- COL_DECIMAL: NULL NULL, -- COL_NUMERIC: NULL NULL, -- COL_NUMBER: NULL NULL, -- COL_BIT: NULL NULL, -- COL_BOOLEAN: NULL NULL, -- COL_DATE: NULL NULL, -- COL_TIME: NULL NULL, -- COL_TIMESTAMP: NULL NULL, -- COL_DATETIME: NULL NULL, -- COL_BINARY: NULL NULL, -- COL_VARBINARY: NULL NULL, -- COL_BLOB: NULL NULL, -- COL_INTERVAL_YM: NULL NULL -- COL_INTERVAL_DT: NULL ); -- 插入第三条测试数据(边界值) INSERT INTO COMPREHENSIVE_TYPE_TEST VALUES ( -128, -- COL_TINYINT: 最小值 32767, -- COL_SMALLINT: 最大值 -2147483648, -- COL_INT: 最小值 -9223372036854775808, -- COL_BIGINT: 最小值 -3.14159, -- COL_FLOAT: 负浮点数 -2.718281828459045, -- COL_DOUBLE: 负双精度 'A', -- COL_CHAR: 单字符 'Short', -- COL_VARCHAR: 短字符串 'Short2', -- COL_VARCHAR2: 短字符串 'Short CLOB', -- COL_CLOB: 短文本 0.01, -- COL_DECIMAL: 最小值 0.0001, -- COL_NUMERIC: 最小值 0, -- COL_NUMBER: 零 0, -- COL_BIT: 假 0, -- COL_BOOLEAN: 假(BIT类型,0=假) TO_DATE('1970-01-01', 'YYYY-MM-DD'), -- COL_DATE: Unix纪元 CAST('00:00:00' AS TIME), -- COL_TIME: 午夜 TO_TIMESTAMP('1970-01-01 00:00:00.000000', 'YYYY-MM-DD HH24:MI:SS.FF6'), -- COL_TIMESTAMP: Unix纪元 TO_TIMESTAMP('1970-01-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS'), -- COL_DATETIME: Unix纪元 HEXTORAW('00'), -- COL_BINARY: 单字节 HEXTORAW('FF'), -- COL_VARBINARY: 单字节 HEXTORAW('000102030405'), -- COL_BLOB: 多字节 INTERVAL '0-0' YEAR TO MONTH, -- COL_INTERVAL_YM: 零间隔 INTERVAL '0 00:00:00.000000' DAY(1) TO SECOND(6) -- COL_INTERVAL_DT: 零间隔 );
二、编译构建DataProxy、Kuscia
这里不详细说构建过程了,构建形成的镜像:
export KUSCIA_IMAGE=secretflow/kuscia:1b08d28-20251209225356
export DATAPROXY_IMAGE=dataproxy:v0.0.1-20251209225425-74b49d
快速部署P2P模式:
./scripts/deploy/kuscia.sh p2p --data-proxy
三、配置数据源及DomainData
3.1 创建数据源
docker exec -it ${USER}-kuscia-autonomy-alice bash
export CTR_CERTS_ROOT=/home/kuscia/var/certs
# 创建达梦数据源(数据源的ip、端口、用户密码信息需要自行修改)
curl -X POST 'https://localhost:8082/api/v1/domaindatasource/create' \
--header "Token: $(cat ${CTR_CERTS_ROOT}/token)" \
--header 'Content-Type: application/json' \
--cert ${CTR_CERTS_ROOT}/kusciaapi-server.crt \
--key ${CTR_CERTS_ROOT}/kusciaapi-server.key \
--cacert ${CTR_CERTS_ROOT}/ca.crt \
-k \
-d '{
"domain_id": "alice",
"datasource_id": "kuscia-ds-01",
"name": "达梦测试数据源(kuscia)",
"type": "dameng",
"info": {
"database": {
"endpoint": "110.40.xx.xx:5237",
"user": "KUSCIA",
"password": "1234567890",
"database": "KUSCIA"
}
}
}'
3.2 创建DomainData(指向达梦数据中测试表)
这里我建了两个DomainData,一个用于从达梦数据库中读取数据,一个用于写数据到达梦数据库。两个DomainData内部定义的列都是一模一样的。
export CTR_CERTS_ROOT=/home/kuscia/var/certs
curl -X POST 'https://localhost:8082/api/v1/domaindata/create' \
--header "Token: $(cat ${CTR_CERTS_ROOT}/token)" \
--header 'Content-Type: application/json' \
--cert ${CTR_CERTS_ROOT}/kusciaapi-server.crt \
--key ${CTR_CERTS_ROOT}/kusciaapi-server.key \
--cacert ${CTR_CERTS_ROOT}/ca.crt \
-k \
-d '{
"domain_id": "alice",
"domaindata_id": "comprehensive-type-test",
"name": "全面类型测试表",
"type": "table",
"datasource_id": "kuscia-ds-01",
"relative_uri": "COMPREHENSIVE_TYPE_TEST",
"attributes": {},
"columns": [
{
"name": "COL_TINYINT",
"type": "int8",
"comment": "8位有符号整数(TINYINT → Int(8) → int8)"
},
{
"name": "COL_SMALLINT",
"type": "int16",
"comment": "16位有符号整数(SMALLINT → Int(16) → int16)"
},
{
"name": "COL_INT",
"type": "int32",
"comment": "32位有符号整数(INT → Int(32) → int32)"
},
{
"name": "COL_BIGINT",
"type": "int64",
"comment": "64位有符号整数(BIGINT → Int(64) → int64)"
},
{
"name": "COL_FLOAT",
"type": "float32",
"comment": "单精度浮点数(FLOAT → FloatingPoint(SINGLE) → float32)"
},
{
"name": "COL_DOUBLE",
"type": "float64",
"comment": "双精度浮点数(DOUBLE → FloatingPoint(DOUBLE) → float64)"
},
{
"name": "COL_CHAR",
"type": "string",
"comment": "定长字符串(CHAR → Utf8 → string)"
},
{
"name": "COL_VARCHAR",
"type": "string",
"comment": "变长字符串(VARCHAR → Utf8 → string)"
},
{
"name": "COL_VARCHAR2",
"type": "string",
"comment": "VARCHAR2类型(VARCHAR2 → Utf8 → string)"
},
{
"name": "COL_CLOB",
"type": "large_string",
"comment": "大文本(CLOB → LargeUtf8 → large_string)"
},
{
"name": "COL_DECIMAL",
"type": "decimal",
"comment": "高精度数值(38,2)(DECIMAL → Decimal(38,2) → float64,精度可能丢失)"
},
{
"name": "COL_NUMERIC",
"type": "float64",
"comment": "数值类型(18,4)(NUMERIC → Decimal(18,4) → float64,精度可能丢失)"
},
{
"name": "COL_NUMBER",
"type": "float64",
"comment": "NUMBER类型(10,0)(NUMBER → Decimal(10,0) → float64,精度可能丢失)"
},
{
"name": "COL_BIT",
"type": "bool",
"comment": "布尔值(BIT → Bool → bool)"
},
{
"name": "COL_BOOLEAN",
"type": "bool",
"comment": "布尔类型(BOOLEAN → Bool → bool)"
},
{
"name": "COL_DATE",
"type": "date32",
"comment": "日期(DATE → Date(DAY) → date32)"
},
{
"name": "COL_TIME",
"type": "time32",
"comment": "时间(TIME → Time(MILLISECOND) → time32)"
},
{
"name": "COL_TIMESTAMP",
"type": "timestamp_us",
"comment": "时间戳(TIMESTAMP → Timestamp(MICROSECOND) → timestamp_us)"
},
{
"name": "COL_DATETIME",
"type": "timestamp_us",
"comment": "日期时间(DATETIME → Timestamp(MICROSECOND) → timestamp_us)"
},
{
"name": "COL_BINARY",
"type": "binary",
"comment": "定长二进制(BINARY → Binary → binary)"
},
{
"name": "COL_VARBINARY",
"type": "binary",
"comment": "变长二进制(VARBINARY → Binary → binary)"
},
{
"name": "COL_BLOB",
"type": "large_binary",
"comment": "大二进制对象(BLOB → LargeBinary → large_binary)"
},
{
"name": "COL_INTERVAL_YM",
"type": "interval_year_month",
"comment": "年月间隔(INTERVAL YEAR TO MONTH → Interval(YEAR_MONTH) → interval_year_month)"
},
{
"name": "COL_INTERVAL_DT",
"type": "interval_day_time",
"comment": "日时间隔(INTERVAL DAY TO SECOND → Interval(DAY_TIME) → interval_day_time)"
}
]
}'
export CTR_CERTS_ROOT=/home/kuscia/var/certs
curl -X POST 'https://localhost:8082/api/v1/domaindata/create' \
--header "Token: $(cat ${CTR_CERTS_ROOT}/token)" \
--header 'Content-Type: application/json' \
--cert ${CTR_CERTS_ROOT}/kusciaapi-server.crt \
--key ${CTR_CERTS_ROOT}/kusciaapi-server.key \
--cacert ${CTR_CERTS_ROOT}/ca.crt \
-k \
-d '{
"domain_id": "alice",
"domaindata_id": "comprehensive-type-test1",
"name": "全面类型测试表",
"type": "table",
"datasource_id": "kuscia-ds-01",
"relative_uri": "COMPREHENSIVE_TYPE_TEST1",
"attributes": {},
"columns": [
{
"name": "COL_TINYINT",
"type": "int8",
"comment": "8位有符号整数(TINYINT → Int(8) → int8)"
},
{
"name": "COL_SMALLINT",
"type": "int16",
"comment": "16位有符号整数(SMALLINT → Int(16) → int16)"
},
{
"name": "COL_INT",
"type": "int32",
"comment": "32位有符号整数(INT → Int(32) → int32)"
},
{
"name": "COL_BIGINT",
"type": "int64",
"comment": "64位有符号整数(BIGINT → Int(64) → int64)"
},
{
"name": "COL_FLOAT",
"type": "float32",
"comment": "单精度浮点数(FLOAT → FloatingPoint(SINGLE) → float32)"
},
{
"name": "COL_DOUBLE",
"type": "float64",
"comment": "双精度浮点数(DOUBLE → FloatingPoint(DOUBLE) → float64)"
},
{
"name": "COL_CHAR",
"type": "string",
"comment": "定长字符串(CHAR → Utf8 → string)"
},
{
"name": "COL_VARCHAR",
"type": "string",
"comment": "变长字符串(VARCHAR → Utf8 → string)"
},
{
"name": "COL_VARCHAR2",
"type": "string",
"comment": "VARCHAR2类型(VARCHAR2 → Utf8 → string)"
},
{
"name": "COL_CLOB",
"type": "large_string",
"comment": "大文本(CLOB → LargeUtf8 → large_string)"
},
{
"name": "COL_DECIMAL",
"type": "decimal",
"comment": "高精度数值(38,2)(DECIMAL → Decimal(38,2) → float64,精度可能丢失)"
},
{
"name": "COL_NUMERIC",
"type": "float64",
"comment": "数值类型(18,4)(NUMERIC → Decimal(18,4) → float64,精度可能丢失)"
},
{
"name": "COL_NUMBER",
"type": "float64",
"comment": "NUMBER类型(10,0)(NUMBER → Decimal(10,0) → float64,精度可能丢失)"
},
{
"name": "COL_BIT",
"type": "bool",
"comment": "布尔值(BIT → Bool → bool)"
},
{
"name": "COL_BOOLEAN",
"type": "bool",
"comment": "布尔类型(BOOLEAN → Bool → bool)"
},
{
"name": "COL_DATE",
"type": "date32",
"comment": "日期(DATE → Date(DAY) → date32)"
},
{
"name": "COL_TIME",
"type": "time32",
"comment": "时间(TIME → Time(MILLISECOND) → time32)"
},
{
"name": "COL_TIMESTAMP",
"type": "timestamp_us",
"comment": "时间戳(TIMESTAMP → Timestamp(MICROSECOND) → timestamp_us)"
},
{
"name": "COL_DATETIME",
"type": "timestamp_us",
"comment": "日期时间(DATETIME → Timestamp(MICROSECOND) → timestamp_us)"
},
{
"name": "COL_BINARY",
"type": "binary",
"comment": "定长二进制(BINARY → Binary → binary)"
},
{
"name": "COL_VARBINARY",
"type": "binary",
"comment": "变长二进制(VARBINARY → Binary → binary)"
},
{
"name": "COL_BLOB",
"type": "large_binary",
"comment": "大二进制对象(BLOB → LargeBinary → large_binary)"
},
{
"name": "COL_INTERVAL_YM",
"type": "interval_year_month",
"comment": "年月间隔(INTERVAL YEAR TO MONTH → Interval(YEAR_MONTH) → interval_year_month)"
},
{
"name": "COL_INTERVAL_DT",
"type": "interval_day_time",
"comment": "日时间隔(INTERVAL DAY TO SECOND → Interval(DAY_TIME) → interval_day_time)"
}
]
}'
四、Kuscia与DataProxy、达梦数据库集成测试
4.1 dataproxy代码中集成测试
在 dataproxy-integration-tests/src/test/java/org/secretflow/dataproxy/integration/tests/DamengIntegrationTest.java中,我有编写达梦插件的集成测试代码,可以与达梦数据库容器中达梦数据库实际交互完成集成测试。
目前达梦官方不提供数据库docker容器了,然后我找到了一个之前达梦官方开放出来的达梦数据库容器,并在其上进行修改(增加了一个数据库大小写不敏感的数据库实例)。然后使用 Testcontainers 启动我自制的达梦数据库 Docker 容器。
GenericContainer<?> container = new GenericContainer<>(DockerImageName.parse("kongxr7/dameng:8.1"))
.withExposedPorts(DOCKER_CONTAINER_PORT, 5237)
.withPrivilegedMode(true)
.withEnv("PAGE_SIZE", "16")
.withEnv("LD_LIBRARY_PATH", "/opt/dmdbms/bin")
.withEnv("INSTANCE_NAME", "dm8db")
.withStartupTimeout(Duration.ofSeconds(600))
.waitingFor(Wait.forListeningPort());
测试结果:

4.2 使用Flight 客户端工具测试
我写了一个dameng_client 测试客户端(达梦数据库的 Flight 客户端工具)。用于测试与 Kuscia DataProxy 交互,实现数据的读取、写入和复制操作。我将这个测试客户端放在alice节点内。
/tmp/dameng_client copy comprehensive-type-test comprehensive-type-test1
就可以使用 dataproxy 向 comprehensive-type-test 指向的达梦数据库读取数据,并解析成 arrow record 打印出来。然后再将读出的 arrow record 通过 dataproxy 写入到 comprehensive-type-test1 指向的 达梦数据库。从而实现 arrow与达梦数据库之间的双向读写测试。
测试结果:
bash-5.2# /tmp/dameng_client copy comprehensive-type-test comprehensive-type-test1
正在从 comprehensive-type-test 复制数据到 comprehensive-type-test1
=== 第一步:读取源数据 ===
正在获取源 FlightInfo...
源 DataProxy URI: grpc+tcp://10.88.0.2:8023
正在从源 DataProxy 读取数据...
Schema: schema:
fields: 24
- COL_TINYINT: type=int8, nullable
- COL_SMALLINT: type=int16, nullable
- COL_INT: type=int32, nullable
- COL_BIGINT: type=int64, nullable
- COL_FLOAT: type=float32, nullable
- COL_DOUBLE: type=float64, nullable
- COL_CHAR: type=utf8, nullable
- COL_VARCHAR: type=utf8, nullable
- COL_VARCHAR2: type=utf8, nullable
- COL_CLOB: type=large_utf8, nullable
- COL_DECIMAL: type=decimal(38, 10), nullable
- COL_NUMERIC: type=decimal(18, 4), nullable
- COL_NUMBER: type=decimal(10, 0), nullable
- COL_BIT: type=bool, nullable
- COL_BOOLEAN: type=bool, nullable
- COL_DATE: type=date32, nullable
- COL_TIME: type=time32[ms], nullable
- COL_TIMESTAMP: type=timestamp[us], nullable
- COL_DATETIME: type=timestamp[us], nullable
- COL_BINARY: type=binary, nullable
- COL_VARBINARY: type=binary, nullable
- COL_BLOB: type=large_binary, nullable
- COL_INTERVAL_YM: type=month_interval, nullable
- COL_INTERVAL_DT: type=day_time_interval, nullable
--- 读取到的 Record #1 ---
行数: 3, 列数: 24
=== 从源读取的 Record #1 数据详情 ===
第 1 行:
COL_TINYINT: 127 (type: *array.Int8)
COL_SMALLINT: -32768 (type: *array.Int16)
COL_INT: 2147483647 (type: *array.Int32)
COL_BIGINT: 9223372036854775807 (type: *array.Int64)
COL_FLOAT: 3.141590 (type: *array.Float32)
COL_DOUBLE: 2.718282 (type: *array.Float64)
COL_CHAR: Hello World (type: *array.String)
COL_VARCHAR: This is a VARCHAR string (type: *array.String)
COL_VARCHAR2: This is a VARCHAR2 string (type: *array.String)
COL_CLOB: This is a CLOB field with very long content. AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA (type: *array.LargeString)
COL_DECIMAL: 12345678901234567890.1200000000 (type: *array.Decimal128)
COL_NUMERIC: 12345678901234.5678 (type: *array.Decimal128)
COL_NUMBER: 9876543210 (type: *array.Decimal128)
COL_BIT: true (type: *array.Boolean)
COL_BOOLEAN: true (type: *array.Boolean)
COL_DATE: 2024-01-15 (type: *array.Date32)
COL_TIME: 14:30:45.000 (type: *array.Time32)
COL_TIMESTAMP: 2024-01-15T06:30:45Z (type: *array.Timestamp)
COL_DATETIME: 2024-01-15T06:30:45Z (type: *array.Timestamp)
COL_BINARY: 48656C6C6F0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 (type: *array.Binary)
COL_VARBINARY: 576F726C64 (type: *array.Binary)
COL_BLOB: 48656C6C6F20576F726C6421 (type: *array.LargeBinary)
COL_INTERVAL_YM: 2-3 (type: *array.MonthInterval)
COL_INTERVAL_DT: 1 days 45045123 ms (type: *array.DayTimeInterval)
第 2 行:
COL_TINYINT: NULL (type: *array.Int8)
COL_SMALLINT: NULL (type: *array.Int16)
COL_INT: NULL (type: *array.Int32)
COL_BIGINT: NULL (type: *array.Int64)
COL_FLOAT: NULL (type: *array.Float32)
COL_DOUBLE: NULL (type: *array.Float64)
COL_CHAR: NULL (type: *array.String)
COL_VARCHAR: NULL (type: *array.String)
COL_VARCHAR2: NULL (type: *array.String)
COL_CLOB: NULL (type: *array.LargeString)
COL_DECIMAL: NULL (type: *array.Decimal128)
COL_NUMERIC: NULL (type: *array.Decimal128)
COL_NUMBER: NULL (type: *array.Decimal128)
COL_BIT: NULL (type: *array.Boolean)
COL_BOOLEAN: NULL (type: *array.Boolean)
COL_DATE: NULL (type: *array.Date32)
COL_TIME: NULL (type: *array.Time32)
COL_TIMESTAMP: NULL (type: *array.Timestamp)
COL_DATETIME: NULL (type: *array.Timestamp)
COL_BINARY: NULL (type: *array.Binary)
COL_VARBINARY: NULL (type: *array.Binary)
COL_BLOB: NULL (type: *array.LargeBinary)
COL_INTERVAL_YM: NULL (type: *array.MonthInterval)
COL_INTERVAL_DT: NULL (type: *array.DayTimeInterval)
第 3 行:
COL_TINYINT: -128 (type: *array.Int8)
COL_SMALLINT: 32767 (type: *array.Int16)
COL_INT: -2147483648 (type: *array.Int32)
COL_BIGINT: -9223372036854775808 (type: *array.Int64)
COL_FLOAT: -3.141590 (type: *array.Float32)
COL_DOUBLE: -2.718282 (type: *array.Float64)
COL_CHAR: A (type: *array.String)
COL_VARCHAR: Short (type: *array.String)
COL_VARCHAR2: Short2 (type: *array.String)
COL_CLOB: Short CLOB (type: *array.LargeString)
COL_DECIMAL: 0.0100000000 (type: *array.Decimal128)
COL_NUMERIC: 0.0001 (type: *array.Decimal128)
COL_NUMBER: 0 (type: *array.Decimal128)
COL_BIT: false (type: *array.Boolean)
COL_BOOLEAN: false (type: *array.Boolean)
COL_DATE: 1970-01-01 (type: *array.Date32)
COL_TIME: 00:00:00.000 (type: *array.Time32)
COL_TIMESTAMP: 1969-12-31T16:00:00Z (type: *array.Timestamp)
COL_DATETIME: 1969-12-31T16:00:00Z (type: *array.Timestamp)
COL_BINARY: 00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 (type: *array.Binary)
COL_VARBINARY: FF (type: *array.Binary)
COL_BLOB: 000102030405 (type: *array.LargeBinary)
COL_INTERVAL_YM: 0-0 (type: *array.MonthInterval)
COL_INTERVAL_DT: 0 days 0 ms (type: *array.DayTimeInterval)
========================
成功读取 1 个 Record,共 3 行数据
=== 第二步:写入目标数据 ===
正在获取目标 FlightInfo...
目标 DataProxy URI: grpc+tcp://10.88.0.2:8023
正在向目标 DataProxy 写入数据...
--- 写入 Record #1 ---
正在写入 Record 1/1 (3 行)...
=== 即将写入的 Record #1 数据详情 ===
第 1 行:
COL_TINYINT: 127 (type: *array.Int8)
COL_SMALLINT: -32768 (type: *array.Int16)
COL_INT: 2147483647 (type: *array.Int32)
COL_BIGINT: 9223372036854775807 (type: *array.Int64)
COL_FLOAT: 3.141590 (type: *array.Float32)
COL_DOUBLE: 2.718282 (type: *array.Float64)
COL_CHAR: Hello World (type: *array.String)
COL_VARCHAR: This is a VARCHAR string (type: *array.String)
COL_VARCHAR2: This is a VARCHAR2 string (type: *array.String)
COL_CLOB: This is a CLOB field with very long content. AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA (type: *array.LargeString)
COL_DECIMAL: 12345678901234567890.1200000000 (type: *array.Decimal128)
COL_NUMERIC: 12345678901234.5678 (type: *array.Decimal128)
COL_NUMBER: 9876543210 (type: *array.Decimal128)
COL_BIT: true (type: *array.Boolean)
COL_BOOLEAN: true (type: *array.Boolean)
COL_DATE: 2024-01-15 (type: *array.Date32)
COL_TIME: 14:30:45.000 (type: *array.Time32)
COL_TIMESTAMP: 2024-01-15T06:30:45Z (type: *array.Timestamp)
COL_DATETIME: 2024-01-15T06:30:45Z (type: *array.Timestamp)
COL_BINARY: 48656C6C6F0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 (type: *array.Binary)
COL_VARBINARY: 576F726C64 (type: *array.Binary)
COL_BLOB: 48656C6C6F20576F726C6421 (type: *array.LargeBinary)
COL_INTERVAL_YM: 2-3 (type: *array.MonthInterval)
COL_INTERVAL_DT: 1 days 45045123 ms (type: *array.DayTimeInterval)
第 2 行:
COL_TINYINT: NULL (type: *array.Int8)
COL_SMALLINT: NULL (type: *array.Int16)
COL_INT: NULL (type: *array.Int32)
COL_BIGINT: NULL (type: *array.Int64)
COL_FLOAT: NULL (type: *array.Float32)
COL_DOUBLE: NULL (type: *array.Float64)
COL_CHAR: NULL (type: *array.String)
COL_VARCHAR: NULL (type: *array.String)
COL_VARCHAR2: NULL (type: *array.String)
COL_CLOB: NULL (type: *array.LargeString)
COL_DECIMAL: NULL (type: *array.Decimal128)
COL_NUMERIC: NULL (type: *array.Decimal128)
COL_NUMBER: NULL (type: *array.Decimal128)
COL_BIT: NULL (type: *array.Boolean)
COL_BOOLEAN: NULL (type: *array.Boolean)
COL_DATE: NULL (type: *array.Date32)
COL_TIME: NULL (type: *array.Time32)
COL_TIMESTAMP: NULL (type: *array.Timestamp)
COL_DATETIME: NULL (type: *array.Timestamp)
COL_BINARY: NULL (type: *array.Binary)
COL_VARBINARY: NULL (type: *array.Binary)
COL_BLOB: NULL (type: *array.LargeBinary)
COL_INTERVAL_YM: NULL (type: *array.MonthInterval)
COL_INTERVAL_DT: NULL (type: *array.DayTimeInterval)
第 3 行:
COL_TINYINT: -128 (type: *array.Int8)
COL_SMALLINT: 32767 (type: *array.Int16)
COL_INT: -2147483648 (type: *array.Int32)
COL_BIGINT: -9223372036854775808 (type: *array.Int64)
COL_FLOAT: -3.141590 (type: *array.Float32)
COL_DOUBLE: -2.718282 (type: *array.Float64)
COL_CHAR: A (type: *array.String)
COL_VARCHAR: Short (type: *array.String)
COL_VARCHAR2: Short2 (type: *array.String)
COL_CLOB: Short CLOB (type: *array.LargeString)
COL_DECIMAL: 0.0100000000 (type: *array.Decimal128)
COL_NUMERIC: 0.0001 (type: *array.Decimal128)
COL_NUMBER: 0 (type: *array.Decimal128)
COL_BIT: false (type: *array.Boolean)
COL_BOOLEAN: false (type: *array.Boolean)
COL_DATE: 1970-01-01 (type: *array.Date32)
COL_TIME: 00:00:00.000 (type: *array.Time32)
COL_TIMESTAMP: 1969-12-31T16:00:00Z (type: *array.Timestamp)
COL_DATETIME: 1969-12-31T16:00:00Z (type: *array.Timestamp)
COL_BINARY: 00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 (type: *array.Binary)
COL_VARBINARY: FF (type: *array.Binary)
COL_BLOB: 000102030405 (type: *array.LargeBinary)
COL_INTERVAL_YM: 0-0 (type: *array.MonthInterval)
COL_INTERVAL_DT: 0 days 0 ms (type: *array.DayTimeInterval)
========================
Record #1 写入成功
目标服务器已关闭流,写入完成。
成功复制 3 行数据从 comprehensive-type-test 到 comprehensive-type-test1