Taier
Taier copied to clipboard
flinksql报错
kafka topic数据格式: { "message": { "schema": "test", "opTime": 1653623501000, "before": {}, "after": { "col1": "5", "col2": "5" }, "type": "INSERT", "table": "src_test", "ts": 6935799661563351040 } }
flinksql界面配置
语法检查报错信息:
[09:58:51]
----------sql end---------
SQL parse failed. Encountered "as" at line 1, column 33. Was expecting one of: "CHARACTER" ... "CONSTRAINT" ... "NOT" ... "NULL" ... "PRIMARY" ... "UNIQUE" ... "COMMENT" ... "METADATA" ... "(" ... ")" ... "," ... "MULTISET" ... "ARRAY" ...
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: com.dtstack.chunjun.throwable.ChunJunSqlParseException: ----------sql start--------- 1> CREATE TABLE src ( col1 varchar as col1,col2 varchar as col2,proc_time AS PROCTIME() ) WITH ( 'properties.bootstrap.servers' = 'node2:9092','connector' = 'kafka-x','scan.parallelism' = '1','format' = 'json','topic' = 'student10','scan.startup.mode' = 'latest-offset' )
----------sql end---------
SQL parse failed. Encountered "as" at line 1, column 33. Was expecting one of: "CHARACTER" ... "CONSTRAINT" ... "NOT" ... "NULL" ... "PRIMARY" ... "UNIQUE" ... "COMMENT" ... "METADATA" ... "(" ... ")" ... "," ... "MULTISET" ... "ARRAY" ...
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:371)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:224)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:117)
at com.dtstack.taier.flink.FlinkClient.grammarCheck(FlinkClient.java:1096)
at com.dtstack.taier.common.client.ClientProxy.lambda$null$18(ClientProxy.java:347)
at com.dtstack.taier.pluginapi.callback.ClassLoaderCallBackMethod.callbackAndReset(ClassLoaderCallBackMethod.java:31)
at com.dtstack.taier.common.client.ClientProxy.lambda$grammarCheck$19(ClientProxy.java:347)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.dtstack.chunjun.throwable.ChunJunRuntimeException: com.dtstack.chunjun.throwable.ChunJunSqlParseException: ----------sql start--------- 1> CREATE TABLE src ( col1 varchar as col1,col2 varchar as col2,proc_time AS PROCTIME() ) WITH ( 'properties.bootstrap.servers' = 'node2:9092','connector' = 'kafka-x','scan.parallelism' = '1','format' = 'json','topic' = 'student10','scan.startup.mode' = 'latest-offset' )
----------sql end---------
SQL parse failed. Encountered "as" at line 1, column 33. Was expecting one of: "CHARACTER" ... "CONSTRAINT" ... "NOT" ... "NULL" ... "PRIMARY" ... "UNIQUE" ... "COMMENT" ... "METADATA" ... "(" ... ")" ... "," ... "MULTISET" ... "ARRAY" ...
at com.dtstack.chunjun.Main.exeSqlJob(Main.java:160)
at com.dtstack.chunjun.Main.main(Main.java:119)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:354)
... 12 more
Caused by: com.dtstack.chunjun.throwable.ChunJunSqlParseException: ----------sql start--------- 1> CREATE TABLE src ( col1 varchar as col1,col2 varchar as col2,proc_time AS PROCTIME() ) WITH ( 'properties.bootstrap.servers' = 'node2:9092','connector' = 'kafka-x','scan.parallelism' = '1','format' = 'json','topic' = 'student10','scan.startup.mode' = 'latest-offset' )
----------sql end---------
SQL parse failed. Encountered "as" at line 1, column 33. Was expecting one of: "CHARACTER" ... "CONSTRAINT" ... "NOT" ... "NULL" ... "PRIMARY" ... "UNIQUE" ... "COMMENT" ... "METADATA" ... "(" ... ")" ... "," ... "MULTISET" ... "ARRAY" ...
at com.dtstack.chunjun.sql.parser.SqlParser.lambda$parseSql$1(SqlParser.java:70)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at com.dtstack.chunjun.sql.parser.SqlParser.parseSql(SqlParser.java:65)
at com.dtstack.chunjun.Main.exeSqlJob(Main.java:151)
... 18 more
Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "as" at line 1, column 33. Was expecting one of: "CHARACTER" ... "CONSTRAINT" ... "NOT" ... "NULL" ... "PRIMARY" ... "UNIQUE" ... "COMMENT" ... "METADATA" ... "(" ... ")" ... "," ... "MULTISET" ... "ARRAY" ...
at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:74)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
at com.dtstack.chunjun.sql.parser.AbstractStmtParser.handleStmt(AbstractStmtParser.java:55)
at com.dtstack.chunjun.sql.parser.AbstractStmtParser.handleStmt(AbstractStmtParser.java:52)
at com.dtstack.chunjun.sql.parser.AbstractStmtParser.handleStmt(AbstractStmtParser.java:52)
at com.dtstack.chunjun.sql.parser.SqlParser.lambda$parseSql$1(SqlParser.java:68)
... 29 more
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "as" at line 1, column 33. Was expecting one of: "CHARACTER" ... "CONSTRAINT" ... "NOT" ... "NULL" ... "PRIMARY" ... "UNIQUE" ... "COMMENT" ... "METADATA" ... "(" ... ")" ... "," ... "MULTISET" ... "ARRAY" ...
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:442)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:205)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
... 35 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "as" at line 1, column 33. Was expecting one of: "CHARACTER" ... "CONSTRAINT" ... "NOT" ... "NULL" ... "PRIMARY" ... "UNIQUE" ... "COMMENT" ... "METADATA" ... "(" ... ")" ... "," ... "MULTISET" ... "ARRAY" ...
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39525)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39336)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5460)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6467)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:20966)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3381)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3882)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:253)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153)
... 37 more
[09:58:52]
1.12 版本的 flink sql 不允许在字段定义中使用as,如果需要使用as,是在insert into 的dml语句中使用。
CREATE TABLE source
(
id INT,
name STRING,
money DECIMAL(32, 2),
dateone timestamp,
age bigint,
datethree timestamp,
datesix timestamp(6),
datenigth timestamp(9),
dtdate date,
dttime time,
PROCTIME AS PROCTIME()
) WITH (
'connector' = 'stream-x',
'number-of-rows' = '10', -- 输入条数,默认无限
'rows-per-second' = '1' -- 每秒输入条数,默认不限制
);
CREATE TABLE dim
(
id INT,
name STRING,
clob_ CLOB,
blob_ BLOB,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oracle-x',
'url' = 'jdbc:oracle:thin:@oracle01:1521:orcl',
'table-name' = 'TIEZHU_ONE',
'username' = '',
'password' = '',
'lookup.cache-type' = 'lru'
);
CREATE TABLE sink
(
id INT,
name_from_source STRING,
name_from_dim STRING,
blob_ BLOB,
clob_ CLOB
) WITH (
'connector' = 'stream-x',
'print' = 'true'
);
insert into sink
select s.id as id,
s.name as name_from_source,
d.name as name_from_dim,
d.blob_ as blob_,
d.clob_ as clob_
from source s
join dim d on s.id = d.id;
你好,就比如我的kafka消息是:{ "message": { "schema": "test", "opTime": 1653623501000, "before": {}, "after": { "col1": "5", "col2": "5" }, "type": "INSERT", "table": "src_test", "ts": 6935799661563351040 } }
taier的这里怎么写啊
------------------ 原始邮件 ------------------ 发件人: @.>; 发送时间: 2022年5月30日(星期一) 中午11:05 收件人: @.>; 抄送: "烦不烦 @.>; @.>; 主题: Re: [DTStack/Taier] flinksql报错 (Issue #553)
CREATE TABLE source ( id INT, name STRING, money DECIMAL(32, 2), dateone timestamp, age bigint, datethree timestamp, datesix timestamp(6), datenigth timestamp(9), dtdate date, dttime time, PROCTIME AS PROCTIME() ) WITH ( 'connector' = 'stream-x', 'number-of-rows' = '10', -- 输入条数,默认无限 'rows-per-second' = '1' -- 每秒输入条数,默认不限制 ); CREATE TABLE dim ( id INT, name STRING, clob_ CLOB, blob_ BLOB, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'oracle-x', 'url' = @.***:1521:orcl', 'table-name' = 'TIEZHU_ONE', 'username' = '', 'password' = '', 'lookup.cache-type' = 'lru' ); CREATE TABLE sink ( id INT, name_from_source STRING, name_from_dim STRING, blob_ BLOB, clob_ CLOB ) WITH ( 'connector' = 'stream-x', 'print' = 'true' ); insert into sink select s.id as id, s.name as name_from_source, d.name as name_from_dim, d.blob_ as blob_, d.clob_ as clob_ from source s join dim d on s.id = d.id;
— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>
CREATE TABLE order_info_kafka
(
message ROW< `schema` varchar,
opTime bigint,
type varchar,
`table` varchar,
after ROW<id varchar,
name varchar,
price decimal(15, 2) >>,
proc_time AS PROCTIME()
) WITH (
'properties.bootstrap.servers' = 'flink01:9092,flink02:9092,flink03:9092',
'connector' = 'kafka-x',
'scan.parallelism' = '1',
'format' = 'json',
'scan.startup.mode' = 'latest-offset',
'topic' = 'tiezhu_in_one'
);
CREATE TABLE order_info_hive
(
id int,
message varchar,
name varchar,
price decimal(15, 2)
) WITH (
'file-type' = 'parquet',
'properties.dfs.ha.namenodes.ns1' = 'nn1,nn2',
'default-fs' = 'hdfs://ns1',
'url' = 'jdbc:hive2://flink03:10004/tiezhu',
'properties.dfs.namenode.rpc-address.ns1.nn2' = 'nn2:9000',
'properties.dfs.namenode.rpc-address.ns1.nn1' = 'nn1:9000',
'password' = '',
'partition' = 'dt',
'connector' = 'hive-x',
'properties.dfs.client.failover.proxy.provider.ns1' = 'org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider',
'table-name' = 'test_one',
'sink.parallelism' = '1',
'partition-type' = 'DAY',
'username' = 'hive',
'properties.dfs.nameservices' = 'ns1'
);
INSERT
into order_info_hive
select cast(id as int) as id,
name as name,
type as message,
cast(case
when (price is null) then 0.0
else price
end as DECIMAL(15, 2))
from order_info_kafka;
kafka 数据如下: {"message":{"schema":"tiezhu","after":{"name":"vv","dt":"2022-05-24","id":"1","message":"ee","price":"12.33"},"type":"UPDATE","before":{"name":"vv","dt":"2022-05-24","id":"1","message":"ff","price":"12.33"},"table":"three","ts":6934687229973499904}}
{"message":{"schema":"tiezhu","after":{"name":"vv","dt":"2022-05-24","id":"1","message":"ee","price":"12.33"},"type":"UPDATE","before":{"name":"vv","dt":"2022-05-24","id":"1","message":"ff","price":"12.33"},"table":"three","ts":6934687229973499904}}
我的kafka topic message 跟你的不太一样,没有before信息,只有after,是需要怎么配置吗?
------------------ 原始邮件 ------------------ 发件人: @.>; 发送时间: 2022年5月30日(星期一) 中午11:15 收件人: @.>; 抄送: "烦不烦 @.>; @.>; 主题: Re: [DTStack/Taier] flinksql报错 (Issue #553)
CREATE TABLE order_info_kafka ( message ROW< schema
varchar, opTime bigint, type varchar, table
varchar, after ROW<id varchar, name varchar, price decimal(15, 2) >>, proc_time AS PROCTIME() ) WITH ( 'properties.bootstrap.servers' = 'flink01:9092,flink02:9092,flink03:9092', 'connector' = 'kafka-x', 'scan.parallelism' = '1', 'format' = 'json', 'scan.startup.mode' = 'latest-offset', 'topic' = 'tiezhu_in_one' ); CREATE TABLE order_info_hive ( id int, message varchar, name varchar, price decimal(15, 2) ) WITH ( 'file-type' = 'parquet', 'properties.dfs.ha.namenodes.ns1' = 'nn1,nn2', 'default-fs' = 'hdfs://ns1', 'url' = 'jdbc:hive2://flink03:10004/tiezhu', 'properties.dfs.namenode.rpc-address.ns1.nn2' = 'nn2:9000', 'properties.dfs.namenode.rpc-address.ns1.nn1' = 'nn1:9000', 'password' = '', 'partition' = 'dt', 'connector' = 'hive-x', 'properties.dfs.client.failover.proxy.provider.ns1' = 'org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider', 'table-name' = 'test_one', 'sink.parallelism' = '1', 'partition-type' = 'DAY', 'username' = 'hive', 'properties.dfs.nameservices' = 'ns1' ); INSERT into order_info_hive select cast(id as int) as id, name as name, type as message, cast(case when (price is null) then 0.0 else price end as DECIMAL(15, 2)) from order_info_kafka;
kafka 数据如下: {"message":{"schema":"tiezhu","after":{"name":"vv","dt":"2022-05-24","id":"1","message":"ee","price":"12.33"},"type":"UPDATE","before":{"name":"vv","dt":"2022-05-24","id":"1","message":"ff","price":"12.33"},"table":"three","ts":6934687229973499904}}
— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>
你好,我有个很大的疑惑,现在用taier,明明在我第一个截图那里可以配置src和sink,只写insert into sink select * from src不就可以了吗? 为什么还要写一遍创建表语句呢?
------------------ 原始邮件 ------------------ 发件人: @.>; 发送时间: 2022年5月30日(星期一) 中午11:15 收件人: @.>; 抄送: "烦不烦 @.>; @.>; 主题: Re: [DTStack/Taier] flinksql报错 (Issue #553)
CREATE TABLE order_info_kafka ( message ROW< schema
varchar, opTime bigint, type varchar, table
varchar, after ROW<id varchar, name varchar, price decimal(15, 2) >>, proc_time AS PROCTIME() ) WITH ( 'properties.bootstrap.servers' = 'flink01:9092,flink02:9092,flink03:9092', 'connector' = 'kafka-x', 'scan.parallelism' = '1', 'format' = 'json', 'scan.startup.mode' = 'latest-offset', 'topic' = 'tiezhu_in_one' ); CREATE TABLE order_info_hive ( id int, message varchar, name varchar, price decimal(15, 2) ) WITH ( 'file-type' = 'parquet', 'properties.dfs.ha.namenodes.ns1' = 'nn1,nn2', 'default-fs' = 'hdfs://ns1', 'url' = 'jdbc:hive2://flink03:10004/tiezhu', 'properties.dfs.namenode.rpc-address.ns1.nn2' = 'nn2:9000', 'properties.dfs.namenode.rpc-address.ns1.nn1' = 'nn1:9000', 'password' = '', 'partition' = 'dt', 'connector' = 'hive-x', 'properties.dfs.client.failover.proxy.provider.ns1' = 'org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider', 'table-name' = 'test_one', 'sink.parallelism' = '1', 'partition-type' = 'DAY', 'username' = 'hive', 'properties.dfs.nameservices' = 'ns1' ); INSERT into order_info_hive select cast(id as int) as id, name as name, type as message, cast(case when (price is null) then 0.0 else price end as DECIMAL(15, 2)) from order_info_kafka;
kafka 数据如下: {"message":{"schema":"tiezhu","after":{"name":"vv","dt":"2022-05-24","id":"1","message":"ee","price":"12.33"},"type":"UPDATE","before":{"name":"vv","dt":"2022-05-24","id":"1","message":"ff","price":"12.33"},"table":"three","ts":6934687229973499904}}
— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>
如果使用Taier 可以通过向导模式建表,不需要自定义ddl建表语句。