chunjun icon indicating copy to clipboard operation
chunjun copied to clipboard

双流join 语法检测异常

Open momisabuilder opened this issue 2 years ago • 3 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

脚本: CREATE TABLE sumsrc( message ROW < test varchar, src_test varchar, after ROW < col1 varchar, col2 varchar >>, proc_time AS PROCTIME() )WITH( 'properties.bootstrap.servers'='node2:9092', 'connector'='kafka-x', 'scan.parallelism'='1', 'format'='json', 'topic'='shuangliu1', 'scan.startup.mode'='latest-offset' );

CREATE TABLE sumsrc2( message ROW < test varchar, src_test varchar, after ROW < col3 varchar, col4 varchar >>, proc_time AS PROCTIME() )WITH( 'properties.bootstrap.servers'='node2:9092', 'connector'='kafka-x', 'scan.parallelism'='1', 'format'='json', 'topic'='shuangliu2', 'scan.startup.mode'='latest-offset' );

CREATE TABLE sumsink( col1 VARCHAR, col2 VARCHAR, col3 VARCHAR, col4 VARCHAR )WITH( 'password'='xxxxxxxxxx', 'connector'='mysql-x', 'sink.buffer-flush.interval'='1000', 'sink.all-replace'='false', 'sink.buffer-flush.max-rows'='100', 'table-name'='shuangliujoin', 'sink.parallelism'='1', 'url'='jdbc:mysql://xxxxxxxxxxxxx:33061/test', 'username'='root' );

INSERT
INTO sumsink SELECT s.col1 as col1, s.col2 as col2, w.col3 as col3, w.col4 as col4
FROM sumsrc s left join sumsrc2 FOR SYSTEM_TIME AS OF s.proc_time as w ON s.col1=w.col3;

异常: [11:17:46] 语法检查开始 [11:17:47] org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.table.api.ValidationException: Currently the join key in Temporal Table Join can not be empty. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.table.api.ValidationException: Currently the join key in Temporal Table Join can not be empty. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:371) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:224) at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:117) at com.dtstack.taier.flink.FlinkClient.grammarCheck(FlinkClient.java:1096) at com.dtstack.taier.common.client.ClientProxy.lambda$null$18(ClientProxy.java:347) at com.dtstack.taier.pluginapi.callback.ClassLoaderCallBackMethod.callbackAndReset(ClassLoaderCallBackMethod.java:31) at com.dtstack.taier.common.client.ClientProxy.lambda$grammarCheck$19(ClientProxy.java:347) at java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1590) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: com.dtstack.flinkx.throwable.FlinkxRuntimeException: org.apache.flink.table.api.ValidationException: Currently the join key in Temporal Table Join can not be empty. at com.dtstack.flinkx.Main.exeSqlJob(Main.java:149) at com.dtstack.flinkx.Main.main(Main.java:108) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:354) ... 13 more Caused by: org.apache.flink.table.api.ValidationException: Currently the join key in Temporal Table Join can not be empty. at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.immutable.Range.foreach(Range.scala:155) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676) at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:98) at com.dtstack.flinkx.Main.exeSqlJob(Main.java:141) ... 19 more

[11:17:47] 语法检查失败!

What you expected to happen

1

How to reproduce

1

Anything else

1

Version

1.10_release

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

momisabuilder avatar Aug 22 '22 03:08 momisabuilder

分支是1.10_release?还是master呢?

FlechazoW avatar Aug 24 '22 11:08 FlechazoW

flink sql 双流join 可以学习这篇文章 https://www.cnblogs.com/Springmoon-venn/p/12719320.html

FlechazoW avatar Aug 24 '22 11:08 FlechazoW

麻烦看下我的脚本有问题吗?

------------------ 原始邮件 ------------------ 发件人: @.>; 发送时间: 2022年8月24日(星期三) 晚上7:44 收件人: @.>; 抄送: "烦不烦 @.>; @.>; 主题: Re: [DTStack/chunjun] 双流join 语法检测异常 (Issue #1174)

flink sql 双流join 可以学习这篇文章 https://www.cnblogs.com/Springmoon-venn/p/12719320.html

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>

momisabuilder avatar Aug 25 '22 02:08 momisabuilder

1.12

------------------ 原始邮件 ------------------ 发件人: @.>; 发送时间: 2022年8月24日(星期三) 晚上7:38 收件人: @.>; 抄送: "烦不烦 @.>; @.>; 主题: Re: [DTStack/chunjun] 双流join 语法检测异常 (Issue #1174)

分支是1.10_release?还是master呢?

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>

momisabuilder avatar Oct 11 '22 07:10 momisabuilder