starrocks-connector-for-apache-flink
starrocks-connector-for-apache-flink copied to clipboard
[StarRocksSourceFlinkRows#genFlinkRows ](https://github.com/StarRocks/starrocks-connector-for-apache-flink/blob/main/src/main/java/com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRows.java#L156 ) does not throw exception when data types between Flink and StarRocks mismatch, and we should fix it. This problem is introduced in #122, and can be reproduced...
#117 support sink with no schema demo: ``` CREATE TABLE `source_table` ( `key` STRING, `value` STRING, `biz_type` INT, `vehicle_id` STRING ) WITH ( 'connector' = 'kafka', ... ); CREATE TABLE...
# 场景 上游数据的特征为schema变化频繁,为了避免频繁的重启任务,需要借助于StarRocks 根据表Schema解析Json的能力。即flink 任务的输出为Json体,StarRocks 接收到Json后,根据表当前的Schema 去解析为表数据。只要保证StarRocks 表与上游数据的Schema 保持同步,就可以实现在不影响flink 任务的情况下,下游及时获取到新增Schema的数据 # 现状 目前低阶API 能实现这个功能,但是无法在sql模式下调用 #建议 在sql 模式下通过配置的方式,让用户选择使用Schema模式和无Schema模式,比如: `create table sr_sink ( biz_type int, vehicle_id string, tags array )with( 'connector' =...
Get the table name from Kafka and sink the data into the corresponding table. Can this method be realized by modifying the source code? What needs to be done in...
现象: flink 由于其他问题导致重启,比如 checkpoint 持久化失败,然后发现连接器写入 SR 数据有重复和丢失同时出现。 如下图所示:   连接器代码逻辑问题: 精确一次的情况下,数据的真正写出,并不是在本次的 checkpoint 中完成的。理论上来讲,应该是触发本次 checkpoint 时,就应该将本次 checkpoint 的所有数据进行写出,直至写出成功,才应该认为是本次 checkpoint 成功,做到真正意义上的精确一次。
用的这个 examples:https://github.com/StarRocks/starrocks-connector-for-apache-flink/blob/main/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/LoadCustomJavaRecords.java 2024-07-31 15:22:17.211 [Sink: Unnamed (2/2)#0] INFO org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using predefined options: DEFAULT. 2024-07-31 15:22:17.211 [Sink: Unnamed (2/2)#0] INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=[/data/rocksdb],...
使用Starrocks遇到一个问题: flink写入SR,某段时间SR出现故障,不能写入,flink sink重试3次依然,预期flink此时应该报错重启或者挂掉,但是发现flink正常运行,不再继续写入SR,也不再继续读取上游数据,陷入僵死状态。 任务信息: 没有开启checkpoint,sink.properties.format=json,其余配置皆为默认配置 日志如下: ps:当时没有保留日志,使用另一个程序的日志代替,同样是retry 3次依然失败,flink任务没有报错 ``` 2024-07-02 10:43:16,953 INFO com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor [] - Start to join batch data: label[d0442073-7e85-48f1-8f47-aa3cab7ad15f]. 2024-07-02 10:43:16,953 INFO com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor [] - Executing stream load...
## What type of PR is this: - [x] BugFix - [ ] Feature - [ ] Enhancement - [ ] Refactor - [ ] UT - [ ] Doc...
第一张图表示flink api传过来的是谓词下推后的下标,使用该下标会构建错filterRichInfo **如何复现:** `package org.example; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.time.Duration; public class StarRocksSqlDimApp { public static void main(String[] args) { Configuration configuration =...
## What type of PR is this: - [x] BugFix - [ ] Feature - [ ] Enhancement - [ ] Refactor - [ ] UT - [ ] Doc...