Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
脚本:
CREATE TABLE sumsrc(
message ROW < test
varchar,
src_test
varchar,
after ROW < col1 varchar,
col2 varchar >>,
proc_time AS PROCTIME()
)WITH(
'properties.bootstrap.servers'='node2:9092',
'connector'='kafka-x',
'scan.parallelism'='1',
'format'='json',
'topic'='shuangliu1',
'scan.startup.mode'='latest-offset'
);
CREATE TABLE sumsrc2(
message ROW < test
varchar,
src_test
varchar,
after ROW < col3 varchar,
col4 varchar >>,
proc_time AS PROCTIME()
)WITH(
'properties.bootstrap.servers'='node2:9092',
'connector'='kafka-x',
'scan.parallelism'='1',
'format'='json',
'topic'='shuangliu2',
'scan.startup.mode'='latest-offset'
);
CREATE TABLE sumsink(
col1 VARCHAR,
col2 VARCHAR,
col3 VARCHAR,
col4 VARCHAR
)WITH(
'password'='xxxxxxxxxx',
'connector'='mysql-x',
'sink.buffer-flush.interval'='1000',
'sink.all-replace'='false',
'sink.buffer-flush.max-rows'='100',
'table-name'='shuangliujoin',
'sink.parallelism'='1',
'url'='jdbc:mysql://xxxxxxxxxxxxx:33061/test',
'username'='root'
);
INSERT
INTO
sumsink
SELECT
s.col1 as col1,
s.col2 as col2,
w.col3 as col3,
w.col4 as col4
FROM
sumsrc s
left join
sumsrc2 FOR SYSTEM_TIME AS OF s.proc_time as w ON s.col1=w.col3;
异常:
[11:17:46] 语法检查开始
[11:17:47] org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.table.api.ValidationException: Currently the join key in Temporal Table Join can not be empty.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.table.api.ValidationException: Currently the join key in Temporal Table Join can not be empty.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:371)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:224)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:117)
at com.dtstack.taier.flink.FlinkClient.grammarCheck(FlinkClient.java:1096)
at com.dtstack.taier.common.client.ClientProxy.lambda$null$18(ClientProxy.java:347)
at com.dtstack.taier.pluginapi.callback.ClassLoaderCallBackMethod.callbackAndReset(ClassLoaderCallBackMethod.java:31)
at com.dtstack.taier.common.client.ClientProxy.lambda$grammarCheck$19(ClientProxy.java:347)
at java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1590)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.dtstack.flinkx.throwable.FlinkxRuntimeException: org.apache.flink.table.api.ValidationException: Currently the join key in Temporal Table Join can not be empty.
at com.dtstack.flinkx.Main.exeSqlJob(Main.java:149)
at com.dtstack.flinkx.Main.main(Main.java:108)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:354)
... 13 more
Caused by: org.apache.flink.table.api.ValidationException: Currently the join key in Temporal Table Join can not be empty.
at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.immutable.Range.foreach(Range.scala:155)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:98)
at com.dtstack.flinkx.Main.exeSqlJob(Main.java:141)
... 19 more
[11:17:47] 语法检查失败!
What you expected to happen
1
How to reproduce
1
Anything else
1
Version
1.10_release
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
分支是1.10_release?还是master呢?
flink sql 双流join 可以学习这篇文章 https://www.cnblogs.com/Springmoon-venn/p/12719320.html
麻烦看下我的脚本有问题吗?
------------------ 原始邮件 ------------------
发件人: @.>;
发送时间: 2022年8月24日(星期三) 晚上7:44
收件人: @.>;
抄送: "烦不烦 @.>; @.>;
主题: Re: [DTStack/chunjun] 双流join 语法检测异常 (Issue #1174)
flink sql 双流join 可以学习这篇文章 https://www.cnblogs.com/Springmoon-venn/p/12719320.html
—
Reply to this email directly, view it on GitHub, or unsubscribe.
You are receiving this because you authored the thread.Message ID: @.***>
1.12
------------------ 原始邮件 ------------------
发件人: @.>;
发送时间: 2022年8月24日(星期三) 晚上7:38
收件人: @.>;
抄送: "烦不烦 @.>; @.>;
主题: Re: [DTStack/chunjun] 双流join 语法检测异常 (Issue #1174)
分支是1.10_release?还是master呢?
—
Reply to this email directly, view it on GitHub, or unsubscribe.
You are receiving this because you authored the thread.Message ID: @.***>