flinkStreamSQL
flinkStreamSQL copied to clipboard
请教group windows的order by问题
需要根据1秒的window中的ROWTIME进行排序,但是报如下错误: Exception in thread "main" org.apache.flink.table.api.TableException: Sort on a non-time-attribute field is not supported. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSort.translateToPlanInternal(StreamExecSort.scala:118) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSort.translateToPlanInternal(StreamExecSort.scala:59) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSort.translateToPlan(StreamExecSort.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:91) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355) at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334) at com.dtstack.flink.sql.exec.FlinkSQLExec.sqlUpdate(FlinkSQLExec.java:94) at com.dtstack.flink.sql.exec.ExecuteProcessHelper.sqlTranslation(ExecuteProcessHelper.java:235) at com.dtstack.flink.sql.exec.ExecuteProcessHelper.getStreamExecution(ExecuteProcessHelper.java:169) at com.dtstack.flink.sql.Main.main(Main.java:41) at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:140)
我的脚本如下: CREATE TABLE MyTable( name varchar, after varchar, jstime bigint, WATERMARK FOR jstime AS withOffset(jstime,1000) )WITH( type ='kafka10', bootstrapServers ='pro1:9092', kafka.auto.offset.reset ='latest', topic ='test1', parallelism ='1', sourcedatatype ='json' ); CREATE TABLE result_user_info( name varchar, after varchar, jstime bigint, ROWTIME datetime )WITH( type ='console', parallelism ='1' ); insert into result_user_info(name,after,jstime,ROWTIME) (select name , after , jstime,ROWTIME from MyTable group by name,after,jstime,TUMBLE(ROWTIME, INTERVAL '1' SECOND),ROWTIME order by ROWTIME)
kafka消息: {"name":"testname","jstime":3119988231,"after":"{"khh":"我是khh0003"}"} 想请教是否是语法写错了,还是有别的排序方法,多谢
Flink要开启EventTime。你试下