starrocks-connector-for-apache-flink icon indicating copy to clipboard operation
starrocks-connector-for-apache-flink copied to clipboard

Results 77 starrocks-connector-for-apache-flink issues
Sort by recently updated
recently updated
newest added

[StarRocksSourceFlinkRows#genFlinkRows ](https://github.com/StarRocks/starrocks-connector-for-apache-flink/blob/main/src/main/java/com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRows.java#L156 ) does not throw exception when data types between Flink and StarRocks mismatch, and we should fix it. This problem is introduced in #122, and can be reproduced...

#117 support sink with no schema demo: ``` CREATE TABLE `source_table` ( `key` STRING, `value` STRING, `biz_type` INT, `vehicle_id` STRING ) WITH ( 'connector' = 'kafka', ... ); CREATE TABLE...

# 场景 上游数据的特征为schema变化频繁,为了避免频繁的重启任务,需要借助于StarRocks 根据表Schema解析Json的能力。即flink 任务的输出为Json体,StarRocks 接收到Json后,根据表当前的Schema 去解析为表数据。只要保证StarRocks 表与上游数据的Schema 保持同步,就可以实现在不影响flink 任务的情况下,下游及时获取到新增Schema的数据 # 现状 目前低阶API 能实现这个功能,但是无法在sql模式下调用 #建议 在sql 模式下通过配置的方式,让用户选择使用Schema模式和无Schema模式,比如: `create table sr_sink ( biz_type int, vehicle_id string, tags array )with( 'connector' =...

现象: flink 由于其他问题导致重启,比如 checkpoint 持久化失败,然后发现连接器写入 SR 数据有重复和丢失同时出现。 如下图所示: ![image](https://user-images.githubusercontent.com/97091789/159920537-e37839f1-a3a8-4901-a3c9-bd7c3da6d2fe.png) ![image](https://user-images.githubusercontent.com/97091789/159920383-ddefe8a2-8c20-4aad-8c75-7fb05105f1f5.png) 连接器代码逻辑问题: 精确一次的情况下,数据的真正写出,并不是在本次的 checkpoint 中完成的。理论上来讲,应该是触发本次 checkpoint 时,就应该将本次 checkpoint 的所有数据进行写出,直至写出成功,才应该认为是本次 checkpoint 成功,做到真正意义上的精确一次。

bug

用的这个 examples:https://github.com/StarRocks/starrocks-connector-for-apache-flink/blob/main/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/LoadCustomJavaRecords.java 2024-07-31 15:22:17.211 [Sink: Unnamed (2/2)#0] INFO org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using predefined options: DEFAULT. 2024-07-31 15:22:17.211 [Sink: Unnamed (2/2)#0] INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=[/data/rocksdb],...

使用Starrocks遇到一个问题: flink写入SR,某段时间SR出现故障,不能写入,flink sink重试3次依然,预期flink此时应该报错重启或者挂掉,但是发现flink正常运行,不再继续写入SR,也不再继续读取上游数据,陷入僵死状态。 任务信息: 没有开启checkpoint,sink.properties.format=json,其余配置皆为默认配置 日志如下: ps:当时没有保留日志,使用另一个程序的日志代替,同样是retry 3次依然失败,flink任务没有报错 ``` 2024-07-02 10:43:16,953 INFO com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor [] - Start to join batch data: label[d0442073-7e85-48f1-8f47-aa3cab7ad15f]. 2024-07-02 10:43:16,953 INFO com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor [] - Executing stream load...

## What type of PR is this: - [x] BugFix - [ ] Feature - [ ] Enhancement - [ ] Refactor - [ ] UT - [ ] Doc...

第一张图表示flink api传过来的是谓词下推后的下标,使用该下标会构建错filterRichInfo **如何复现:** `package org.example; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.time.Duration; public class StarRocksSqlDimApp { public static void main(String[] args) { Configuration configuration =...

## What type of PR is this: - [x] BugFix - [ ] Feature - [ ] Enhancement - [ ] Refactor - [ ] UT - [ ] Doc...