flinkStreamSQL
flinkStreamSQL copied to clipboard
基于开源的flink,对其实时sql进行扩展;主要实现了流与维表的join,支持原生flink SQL所有的语法
用 > CREATE VIEW viewName [ (columnName[ , columnName]*) ]; > INSERT INTO viewName queryStatement; 的语法使用临时表,会报错。 我的sql用例如下: ``` CREATE TABLE MyTable( name varchar, channel varchar, pv INT, xctime bigint, CHARACTER_LENGTH(channel)...
> 现在不支持权限的配置 @yangsishu 配置了es,无权限的,es版本5.6.14,运行报如下错误,请问是什么问题? `java.lang.RuntimeException: Elasticsearch client is not connected to any Elasticsearch nodes! at org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:82) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:287) at com.dtstack.flink.sql.sink.elasticsearch.MetricElasticsearchSink.open(MetricElasticsearchSink.java:49) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:420) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:296)...
1.修复socketServer,使用出现空指针的问题 2.提供hbase kerberos认证的统一方式,包括hbase AllSide ,LruSide,Sink
日志样例 {"time":"2020-06-17 23:00:01.211","pushid":"pushback_send-BC110-12392212311","app":"mm"} 由于时间序列是bigint类型,用UNIX_TIMESTAMP进行转换 Flink运行日志报错: ERROR com.dtstack.flink.sql.watermarker.CustomerWaterMarkerForLong - java.lang.NullPointerException 建表语句: CREATE TABLE MyTable ( time varchar , pushid varchar , app varchar , UNIX_TIMESTAMP(time, 'yyyy-MM-dd HH:mm:ss')*1000 bigint AS xctime ,...
1、kafka中的数据格式为: { "distinct_id": "16c6a9eeabd205-08b9c3c13daa25-36f661a-269280-16c6a9eeabe1fd", "lib": { "$lib": "js", "$lib_method": "code", "$lib_version": "1.14.2" } } 2、sink到ES中的数据为 { "_index": "flink2es", "_type": "mytype_1", "_id": "16c6a9eeabd205-08b9c3c13daa25-36f661a-269280-16c6a9eeabe1fd", "_score": 1, "_source": { "lib": null, "distinct_id": "16c6a9eeabd205-08b9c3c13daa25-36f661a-269280-16c6a9eeabe1fd"...
### 嵌套json 解析后输出的数据 uid字段一直为null json日志格式 { "time":"2020-06-15 19:16:29.243", "topic":"room_chat", "appname":"ms", "info":{ "uid":427083 } } kafkasource 建表语句 CREATE TABLE kafka11source ( appname VARCHAR , topic VARCHAR , info.uid bigint as uid,...
正在找一个RedisSink connector就找到了你们的项目,感谢开源!可以提一个需求吗? Redis key现在是`tableName:privateKey:privateKeyValue:columnName`. 可不可以支持customize PrimaryKey and Value 的格式,可以更灵活地支持各种use case. e.g. key is `tableName:privateKeyValue`; value is a serialized form of multiple columns. Like a stringify JSON format of `{column1: column1Value;...
我的sql `INSERT INTO dwd_business_flow SELECT SUBSTRING(CAST(TIMESTAMPADD(DAY,0,LOCALTIMESTAMP) as varchar),1,10) AS stat_date, CASE WHEN t2.fee_type = 'WAGES' THEN '010' WHEN t2.fee_type = 'RENT' THEN '011' WHEN t2.fee_type = 'DEBIT' THEN '012' WHEN...
支持protobuf序列化