flinkStreamSQL icon indicating copy to clipboard operation
flinkStreamSQL copied to clipboard

基于开源的flink,对其实时sql进行扩展;主要实现了流与维表的join,支持原生flink SQL所有的语法

Results 90 flinkStreamSQL issues
Sort by recently updated
recently updated
newest added

> onyarn 提交submit 提示报错,本地环境可以跑,大佬帮忙看下啥问题。 Caused by: java.io.InvalidClassException: org.apache.flink.api.common.operators.ResourceSpec; incompatible types for field cpuCores Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Could not submit job (JobID: 8c60482c1b52409d32be4a3b5849f123) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:250) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)...

可否发布rest服务,将一些常用动作服务化 如:submit命令。 同时希望 submit 指令中的 -sql D:\sideSql.txt 加入另一种形态的表达。可考虑增加一个参适如 sqlscript:" create .... ; insert ...;" 这种格式,将所有能够在一个请求中描述清楚的参数最大化。 变于系统集成和管理。将实施复杂度再下降一些。 感谢!

log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. ------------program params------------------------- -mode yarnPer -name flink_test -sql CREATE+TABLE+MyTable%28%0A%09id+int%2C%0A++++name+varchar%2C%0A++++productID+varchar%0A+%29WITH%28%0A++++type+%3D%27kafka%27%2C%0A++++bootstrapServers+%3D%27datanode-1%3A6667%27%2C%0A++++zookeeperQuorum+%3D%27namenode-1%3A2181%2Fkafka%27%2C%0A++++offsetReset+%3D%27latest%27%2C%0A++++topic+%3D%27ee_test%27%2C%0A++++parallelism+%3D%272%27%0A+%29%3B%0A%0A%0ACREATE+TABLE+MyResult%28%0A++++id+int%2C%0A%09city+varchar%0A+%29WITH%28%0A++++type+%3D%27mysql%27%2C%0A++++url+%3D%27jdbc%3Amysql%3A%2F%2Fdatanode-1%3A3306%2Ftest%3FcharacterEncoding%3Dutf8%26useSSL%3Dtrue%27%2C%0A++++userName+%3D%27root%27%2C%0A++++password+%3D%27root%27%2C%0A++++tableName+%3D%27flink_test1%27%2C%0A++++parallelism+%3D%271%27%0A+%29%3B%0A%0Ainsert%0Ainto%0A++++MyResult%0A++++select%0A+++++id+as+id%2Cname+as+city%0A++++from+%0A++++MyTable%0A -yarnconf /usr/hdp/current/hadoop-client/etc/hadoop -localSqlPluginPath /opt/flinkStreamSQL/plugins -remoteSqlPluginPath /opt/flinkStreamSQL/plugins -confProp {}-flinkconf -flinkJarPath...

源码中ALL模式维表缓存是个flatMap操作,当任务数据量和计算压力增大后,增大SQL的并行度,每个并行度都会去维表中加载数据,这样一个是容易出现把维表连接打满的情况,另一个就是带来不小的内存开销。 因为存在定时更新数据,广播可能不太好做,能否实现一个taskmanager只加载一份维表数据,而不是每个slot都去加载一遍

您 好! 请问是否可以提供 维表数据共享机制,这样不同的JOB可以共享维表数据,这样可以在节省 维表数据存多份,同时可以进一步减轻维表数据库的并发访问压力。 当然可能是我的思路有问题,我们发现flinkstreamsql多个任务在访问同一个oracle数据库时还是有不小性能和session占用压力的。所以想看是否有相关机制能多个同类型的JOB共享维表数据,来降低数据库 压力。

RedisAllReqRow的 flatMap(Tuple2 input, Collector out)方法124行存在数组越界异常。

我将flinkStreamSql从1.8.1升级到1.10,原来可以运行的Join语句出错了。 tableA是源表, tableB 是维表,执行下面的语句 CREATE VIEW v_a_join_b AS SELECT A.col_a1,A.col_a2,A.col_a3,B1.col_b as col_b1,B2.col_b as col_b2 FROM tableA AS A LEFT JOIN tableB AS B1 ON A.col_a1 = B1.col_b LEFT JOIN tableB...

oracle维表ALL模式加载数据时报错,oracle维表数据量大概在80MB左右,定位到RdbAllReqRow这个类的loadData方法下的:while (resultSet.next()) { 这一行报错。报错信息如下: `java.sql.SQLRecoverableException: IO Error: Socket read interrupted at oracle.jdbc.driver.T4CStatement.fetch(T4CStatement.java:1021) at oracle.jdbc.driver.OracleStatement.fetchMoreRows(OracleStatement.java:3353) at oracle.jdbc.driver.InsensitiveScrollableResultSet.fetchMoreRows(InsensitiveScrollableResultSet.java:736) at oracle.jdbc.driver.InsensitiveScrollableResultSet.absoluteInternal(InsensitiveScrollableResultSet.java:692) at oracle.jdbc.driver.InsensitiveScrollableResultSet.next(InsensitiveScrollableResultSet.java:406) at com.bonc.bdsqls.flink.side.rdb.all.RdbAllReqRow.loadData(RdbAllReqRow.java:199) at com.bonc.bdsqls.flink.side.rdb.all.RdbAllReqRow.initCache(RdbAllReqRow.java:93) at com.bonc.bdsqls.flink.side.AllReqRow.open(AllReqRow.java:60) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)...

如题,半关联的方式目前能否支持 Sent with GitHawk