Taier icon indicating copy to clipboard operation
Taier copied to clipboard

flinksql报错

Open momisabuilder opened this issue 2 years ago • 5 comments

kafka topic数据格式: { "message": { "schema": "test", "opTime": 1653623501000, "before": {}, "after": { "col1": "5", "col2": "5" }, "type": "INSERT", "table": "src_test", "ts": 6935799661563351040 } }

flinksql界面配置

image

语法检查报错信息:

[09:58:51] 语法检查开始 [09:58:52] org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: com.dtstack.chunjun.throwable.ChunJunSqlParseException: ----------sql start--------- 1> CREATE TABLE src ( col1 varchar as col1,col2 varchar as col2,proc_time AS PROCTIME() ) WITH ( 'properties.bootstrap.servers' = 'node2:9092','connector' = 'kafka-x','scan.parallelism' = '1','format' = 'json','topic' = 'student10','scan.startup.mode' = 'latest-offset' )

----------sql end---------

SQL parse failed. Encountered "as" at line 1, column 33. Was expecting one of: "CHARACTER" ... "CONSTRAINT" ... "NOT" ... "NULL" ... "PRIMARY" ... "UNIQUE" ... "COMMENT" ... "METADATA" ... "(" ... ")" ... "," ... "MULTISET" ... "ARRAY" ...

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: com.dtstack.chunjun.throwable.ChunJunSqlParseException: ----------sql start--------- 1> CREATE TABLE src ( col1 varchar as col1,col2 varchar as col2,proc_time AS PROCTIME() ) WITH ( 'properties.bootstrap.servers' = 'node2:9092','connector' = 'kafka-x','scan.parallelism' = '1','format' = 'json','topic' = 'student10','scan.startup.mode' = 'latest-offset' )

----------sql end---------

SQL parse failed. Encountered "as" at line 1, column 33. Was expecting one of: "CHARACTER" ... "CONSTRAINT" ... "NOT" ... "NULL" ... "PRIMARY" ... "UNIQUE" ... "COMMENT" ... "METADATA" ... "(" ... ")" ... "," ... "MULTISET" ... "ARRAY" ...

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:371)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:224)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:117)
at com.dtstack.taier.flink.FlinkClient.grammarCheck(FlinkClient.java:1096)
at com.dtstack.taier.common.client.ClientProxy.lambda$null$18(ClientProxy.java:347)
at com.dtstack.taier.pluginapi.callback.ClassLoaderCallBackMethod.callbackAndReset(ClassLoaderCallBackMethod.java:31)
at com.dtstack.taier.common.client.ClientProxy.lambda$grammarCheck$19(ClientProxy.java:347)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

Caused by: com.dtstack.chunjun.throwable.ChunJunRuntimeException: com.dtstack.chunjun.throwable.ChunJunSqlParseException: ----------sql start--------- 1> CREATE TABLE src ( col1 varchar as col1,col2 varchar as col2,proc_time AS PROCTIME() ) WITH ( 'properties.bootstrap.servers' = 'node2:9092','connector' = 'kafka-x','scan.parallelism' = '1','format' = 'json','topic' = 'student10','scan.startup.mode' = 'latest-offset' )

----------sql end---------

SQL parse failed. Encountered "as" at line 1, column 33. Was expecting one of: "CHARACTER" ... "CONSTRAINT" ... "NOT" ... "NULL" ... "PRIMARY" ... "UNIQUE" ... "COMMENT" ... "METADATA" ... "(" ... ")" ... "," ... "MULTISET" ... "ARRAY" ...

at com.dtstack.chunjun.Main.exeSqlJob(Main.java:160)
at com.dtstack.chunjun.Main.main(Main.java:119)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:354)
... 12 more

Caused by: com.dtstack.chunjun.throwable.ChunJunSqlParseException: ----------sql start--------- 1> CREATE TABLE src ( col1 varchar as col1,col2 varchar as col2,proc_time AS PROCTIME() ) WITH ( 'properties.bootstrap.servers' = 'node2:9092','connector' = 'kafka-x','scan.parallelism' = '1','format' = 'json','topic' = 'student10','scan.startup.mode' = 'latest-offset' )

----------sql end---------

SQL parse failed. Encountered "as" at line 1, column 33. Was expecting one of: "CHARACTER" ... "CONSTRAINT" ... "NOT" ... "NULL" ... "PRIMARY" ... "UNIQUE" ... "COMMENT" ... "METADATA" ... "(" ... ")" ... "," ... "MULTISET" ... "ARRAY" ...

at com.dtstack.chunjun.sql.parser.SqlParser.lambda$parseSql$1(SqlParser.java:70)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at com.dtstack.chunjun.sql.parser.SqlParser.parseSql(SqlParser.java:65)
at com.dtstack.chunjun.Main.exeSqlJob(Main.java:151)
... 18 more

Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "as" at line 1, column 33. Was expecting one of: "CHARACTER" ... "CONSTRAINT" ... "NOT" ... "NULL" ... "PRIMARY" ... "UNIQUE" ... "COMMENT" ... "METADATA" ... "(" ... ")" ... "," ... "MULTISET" ... "ARRAY" ...

at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:74)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
at com.dtstack.chunjun.sql.parser.AbstractStmtParser.handleStmt(AbstractStmtParser.java:55)
at com.dtstack.chunjun.sql.parser.AbstractStmtParser.handleStmt(AbstractStmtParser.java:52)
at com.dtstack.chunjun.sql.parser.AbstractStmtParser.handleStmt(AbstractStmtParser.java:52)
at com.dtstack.chunjun.sql.parser.SqlParser.lambda$parseSql$1(SqlParser.java:68)
... 29 more

Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "as" at line 1, column 33. Was expecting one of: "CHARACTER" ... "CONSTRAINT" ... "NOT" ... "NULL" ... "PRIMARY" ... "UNIQUE" ... "COMMENT" ... "METADATA" ... "(" ... ")" ... "," ... "MULTISET" ... "ARRAY" ...

at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:442)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:205)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
... 35 more

Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "as" at line 1, column 33. Was expecting one of: "CHARACTER" ... "CONSTRAINT" ... "NOT" ... "NULL" ... "PRIMARY" ... "UNIQUE" ... "COMMENT" ... "METADATA" ... "(" ... ")" ... "," ... "MULTISET" ... "ARRAY" ...

at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39525)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39336)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5460)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6467)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:20966)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3381)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3882)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:253)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153)
... 37 more

[09:58:52] 语法检查失败!

momisabuilder avatar May 30 '22 02:05 momisabuilder

1.12 版本的 flink sql 不允许在字段定义中使用as,如果需要使用as,是在insert into 的dml语句中使用。

FlechazoW avatar May 30 '22 03:05 FlechazoW

CREATE TABLE source
(
    id        INT,
    name      STRING,
    money     DECIMAL(32, 2),
    dateone   timestamp,
    age       bigint,
    datethree timestamp,
    datesix   timestamp(6),
    datenigth timestamp(9),
    dtdate    date,
    dttime    time,
    PROCTIME AS PROCTIME()

) WITH (
      'connector' = 'stream-x',
      'number-of-rows' = '10', -- 输入条数,默认无限
      'rows-per-second' = '1' -- 每秒输入条数,默认不限制
      );

CREATE TABLE dim
(
    id    INT,
    name  STRING,
    clob_ CLOB,
    blob_ BLOB,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
      'connector' = 'oracle-x',
      'url' = 'jdbc:oracle:thin:@oracle01:1521:orcl',
      'table-name' = 'TIEZHU_ONE',
      'username' = '',
      'password' = '',
      'lookup.cache-type' = 'lru'
      );

CREATE TABLE sink
(
    id               INT,
    name_from_source STRING,
    name_from_dim    STRING,
    blob_            BLOB,
    clob_            CLOB
) WITH (
      'connector' = 'stream-x',
      'print' = 'true'
      );

insert into sink
select s.id    as id,
       s.name  as name_from_source,
       d.name  as name_from_dim,
       d.blob_ as blob_,
       d.clob_ as clob_
from source s
         join dim d on s.id = d.id;

FlechazoW avatar May 30 '22 03:05 FlechazoW

你好,就比如我的kafka消息是:{ "message": { "schema": "test", "opTime": 1653623501000, "before": {}, "after": { "col1": "5", "col2": "5" }, "type": "INSERT", "table": "src_test", "ts": 6935799661563351040 } }

taier的这里怎么写啊

------------------ 原始邮件 ------------------ 发件人: @.>; 发送时间: 2022年5月30日(星期一) 中午11:05 收件人: @.>; 抄送: "烦不烦 @.>; @.>; 主题: Re: [DTStack/Taier] flinksql报错 (Issue #553)

CREATE TABLE source ( id INT, name STRING, money DECIMAL(32, 2), dateone timestamp, age bigint, datethree timestamp, datesix timestamp(6), datenigth timestamp(9), dtdate date, dttime time, PROCTIME AS PROCTIME() ) WITH ( 'connector' = 'stream-x', 'number-of-rows' = '10', -- 输入条数,默认无限 'rows-per-second' = '1' -- 每秒输入条数,默认不限制 ); CREATE TABLE dim ( id INT, name STRING, clob_ CLOB, blob_ BLOB, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'oracle-x', 'url' = @.***:1521:orcl', 'table-name' = 'TIEZHU_ONE', 'username' = '', 'password' = '', 'lookup.cache-type' = 'lru' ); CREATE TABLE sink ( id INT, name_from_source STRING, name_from_dim STRING, blob_ BLOB, clob_ CLOB ) WITH ( 'connector' = 'stream-x', 'print' = 'true' ); insert into sink select s.id as id, s.name as name_from_source, d.name as name_from_dim, d.blob_ as blob_, d.clob_ as clob_ from source s join dim d on s.id = d.id;

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>

momisabuilder avatar May 30 '22 03:05 momisabuilder

CREATE TABLE order_info_kafka
(
    message ROW< `schema` varchar,
    opTime  bigint,
    type    varchar,
    `table` varchar,
    after   ROW<id varchar,
    name    varchar,
    price   decimal(15, 2) >>,
    proc_time AS PROCTIME()
) WITH (
      'properties.bootstrap.servers' = 'flink01:9092,flink02:9092,flink03:9092',
      'connector' = 'kafka-x',
      'scan.parallelism' = '1',
      'format' = 'json',
      'scan.startup.mode' = 'latest-offset',
      'topic' = 'tiezhu_in_one'
      );

CREATE TABLE order_info_hive
(
    id      int,
    message varchar,
    name    varchar,
    price   decimal(15, 2)
) WITH (
      'file-type' = 'parquet',
      'properties.dfs.ha.namenodes.ns1' = 'nn1,nn2',
      'default-fs' = 'hdfs://ns1',
      'url' = 'jdbc:hive2://flink03:10004/tiezhu',
      'properties.dfs.namenode.rpc-address.ns1.nn2' = 'nn2:9000',
      'properties.dfs.namenode.rpc-address.ns1.nn1' = 'nn1:9000',
      'password' = '',
      'partition' = 'dt',
      'connector' = 'hive-x',
      'properties.dfs.client.failover.proxy.provider.ns1' = 'org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider',
      'table-name' = 'test_one',
      'sink.parallelism' = '1',
      'partition-type' = 'DAY',
      'username' = 'hive',
      'properties.dfs.nameservices' = 'ns1'
      );
INSERT
into order_info_hive
select cast(id as int) as id,
       name            as name,
       type            as message,
       cast(case
                when (price is null) then 0.0
                else price
           end as DECIMAL(15, 2))
from order_info_kafka;

kafka 数据如下: {"message":{"schema":"tiezhu","after":{"name":"vv","dt":"2022-05-24","id":"1","message":"ee","price":"12.33"},"type":"UPDATE","before":{"name":"vv","dt":"2022-05-24","id":"1","message":"ff","price":"12.33"},"table":"three","ts":6934687229973499904}}

FlechazoW avatar May 30 '22 03:05 FlechazoW

{"message":{"schema":"tiezhu","after":{"name":"vv","dt":"2022-05-24","id":"1","message":"ee","price":"12.33"},"type":"UPDATE","before":{"name":"vv","dt":"2022-05-24","id":"1","message":"ff","price":"12.33"},"table":"three","ts":6934687229973499904}}

我的kafka topic message 跟你的不太一样,没有before信息,只有after,是需要怎么配置吗?

------------------ 原始邮件 ------------------ 发件人: @.>; 发送时间: 2022年5月30日(星期一) 中午11:15 收件人: @.>; 抄送: "烦不烦 @.>; @.>; 主题: Re: [DTStack/Taier] flinksql报错 (Issue #553)

CREATE TABLE order_info_kafka ( message ROW< schema varchar, opTime bigint, type varchar, table varchar, after ROW<id varchar, name varchar, price decimal(15, 2) >>, proc_time AS PROCTIME() ) WITH ( 'properties.bootstrap.servers' = 'flink01:9092,flink02:9092,flink03:9092', 'connector' = 'kafka-x', 'scan.parallelism' = '1', 'format' = 'json', 'scan.startup.mode' = 'latest-offset', 'topic' = 'tiezhu_in_one' ); CREATE TABLE order_info_hive ( id int, message varchar, name varchar, price decimal(15, 2) ) WITH ( 'file-type' = 'parquet', 'properties.dfs.ha.namenodes.ns1' = 'nn1,nn2', 'default-fs' = 'hdfs://ns1', 'url' = 'jdbc:hive2://flink03:10004/tiezhu', 'properties.dfs.namenode.rpc-address.ns1.nn2' = 'nn2:9000', 'properties.dfs.namenode.rpc-address.ns1.nn1' = 'nn1:9000', 'password' = '', 'partition' = 'dt', 'connector' = 'hive-x', 'properties.dfs.client.failover.proxy.provider.ns1' = 'org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider', 'table-name' = 'test_one', 'sink.parallelism' = '1', 'partition-type' = 'DAY', 'username' = 'hive', 'properties.dfs.nameservices' = 'ns1' ); INSERT into order_info_hive select cast(id as int) as id, name as name, type as message, cast(case when (price is null) then 0.0 else price end as DECIMAL(15, 2)) from order_info_kafka;

kafka 数据如下: {"message":{"schema":"tiezhu","after":{"name":"vv","dt":"2022-05-24","id":"1","message":"ee","price":"12.33"},"type":"UPDATE","before":{"name":"vv","dt":"2022-05-24","id":"1","message":"ff","price":"12.33"},"table":"three","ts":6934687229973499904}}

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>

momisabuilder avatar May 30 '22 03:05 momisabuilder

你好,我有个很大的疑惑,现在用taier,明明在我第一个截图那里可以配置src和sink,只写insert into sink select * from src不就可以了吗? 为什么还要写一遍创建表语句呢?

------------------ 原始邮件 ------------------ 发件人: @.>; 发送时间: 2022年5月30日(星期一) 中午11:15 收件人: @.>; 抄送: "烦不烦 @.>; @.>; 主题: Re: [DTStack/Taier] flinksql报错 (Issue #553)

CREATE TABLE order_info_kafka ( message ROW< schema varchar, opTime bigint, type varchar, table varchar, after ROW<id varchar, name varchar, price decimal(15, 2) >>, proc_time AS PROCTIME() ) WITH ( 'properties.bootstrap.servers' = 'flink01:9092,flink02:9092,flink03:9092', 'connector' = 'kafka-x', 'scan.parallelism' = '1', 'format' = 'json', 'scan.startup.mode' = 'latest-offset', 'topic' = 'tiezhu_in_one' ); CREATE TABLE order_info_hive ( id int, message varchar, name varchar, price decimal(15, 2) ) WITH ( 'file-type' = 'parquet', 'properties.dfs.ha.namenodes.ns1' = 'nn1,nn2', 'default-fs' = 'hdfs://ns1', 'url' = 'jdbc:hive2://flink03:10004/tiezhu', 'properties.dfs.namenode.rpc-address.ns1.nn2' = 'nn2:9000', 'properties.dfs.namenode.rpc-address.ns1.nn1' = 'nn1:9000', 'password' = '', 'partition' = 'dt', 'connector' = 'hive-x', 'properties.dfs.client.failover.proxy.provider.ns1' = 'org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider', 'table-name' = 'test_one', 'sink.parallelism' = '1', 'partition-type' = 'DAY', 'username' = 'hive', 'properties.dfs.nameservices' = 'ns1' ); INSERT into order_info_hive select cast(id as int) as id, name as name, type as message, cast(case when (price is null) then 0.0 else price end as DECIMAL(15, 2)) from order_info_kafka;

kafka 数据如下: {"message":{"schema":"tiezhu","after":{"name":"vv","dt":"2022-05-24","id":"1","message":"ee","price":"12.33"},"type":"UPDATE","before":{"name":"vv","dt":"2022-05-24","id":"1","message":"ff","price":"12.33"},"table":"three","ts":6934687229973499904}}

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>

momisabuilder avatar Oct 11 '22 08:10 momisabuilder

如果使用Taier 可以通过向导模式建表,不需要自定义ddl建表语句。

FlechazoW avatar Oct 12 '22 02:10 FlechazoW