starrocks-connector-for-apache-flink
starrocks-connector-for-apache-flink copied to clipboard
feat: add sink options to sink with no schema
#117 support sink with no schema demo:
CREATE TABLE `source_table` (
`key` STRING,
`value` STRING,
`biz_type` INT,
`vehicle_id` STRING
) WITH (
'connector' = 'kafka',
...
);
CREATE TABLE sink_table (
resultStr String
) WITH (
'connector' = 'starrocks',
'jdbc-url'='jdbc:mysql://****:9030',
'load-url'='*****:8030',
'database-name'='*****',
'table-name'='sr_test',
'username'='*****',
'password'='*******',
'sink.properties.strip_outer_array'='true',
'sink.properties.format'='json',
'sink.buffer-flush.max-bytes' = '2073741824',
'sink.buffer-flush.max-rows' = '500000',
'sink.buffer-flush.interval-ms' = '300000',
'sink.properties.ignore_json_size' = 'true',
'sink.parallelism' = '1',
'sink.with-no-schema' = 'true'
);
insert into sink_table
select MAP_TO_JSON_STRING(KV_TO_MAP('vehicle_id',vehicle_id,'biz_type',biz_type,`key`,`value`))
from source_table;
SR DDL:
CREATE TABLE `sr_test` (
`biz_type` int(11) NULL COMMENT "",
`vehicle_id` varchar(128) NULL COMMENT "",
`batteryLevel` int(11) REPLACE_IF_NOT_NULL NULL COMMENT "",
`voltageStatus` varchar(128) REPLACE_IF_NOT_NULL NULL COMMENT "",
`vehicleVersionId` varchar(128) REPLACE_IF_NOT_NULL NULL COMMENT "",
`opStatus` int(11) REPLACE_IF_NOT_NULL NULL COMMENT "",
`firstInOpTime` bigint(20) REPLACE_IF_NOT_NULL NULL COMMENT "运营时长",
`isOutOpWorkRegion` varchar(128) REPLACE_IF_NOT_NULL NULL COMMENT "",
`lowPowerPriority` int(11) REPLACE_IF_NOT_NULL NULL COMMENT "低电优先级",
`asset_loss_benefit` double REPLACE_IF_NOT_NULL NULL COMMENT "资损收益"
) ENGINE=OLAP
AGGREGATE KEY(`biz_type`, `vehicle_id`)
DISTRIBUTED BY HASH(`biz_type`, `vehicle_id`) BUCKETS 10
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);