FlinkSQL parser doesn't support Row()
1不支持 带ROW()创建的kafkasource以及相关row()的函数
1不支持 带ROW()创建的kafkasource以及相关row()的函数
提供一份完整 sql
-
create table: CREATE TABLE IF NOT EXISTS t_source_topic( body ROW ( Fcus_order_id STRING, Frc_user_mob STRING, Frisk_price_probability_of_default STRING, Faggregate_user_risk_level STRING, Fuser_first_channel_code STRING, Flifecycle_loss_rate STRING, Fdistributary_id STRING, Fflow_id STRING, Fabtest_strategy STRING, Fversion STRING, Fcreate_time STRING, Fmodify_time STRING ), headers row ( exec_time string ),
event_time as TO_TIMESTAMP(body.Fmodify_time), proc_time AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'test-topic', 'properties.bootstrap.servers' = '10.10.22.11:9092', 'properties.group.id' = 'test-group', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.ignore-parse-errors' = 'true' ); -
insert: INSERT INTO sink_t_hbase SELECT
CONCAT(cast((CAST(uid AS BIGINT) % 150 + 100) as string) , '|', cast(uid as string)) AS rowkey, ROW(flabel_value) FROM middle_view;
报错如下 Exception in thread "main" io.github.melin.superior.common.antlr4.ParseException: mismatched input '(' expecting {')', ','}(line 2, pos 13)
== SQL == CREATE TABLE IF NOT EXISTS t_source_topic( body ROW ( -------------^^^ Fcus_order_id STRING, Frc_user_mob STRING, Frisk_price_probability_of_default STRING, Faggregate_user_risk_level STRING, Fuser_first_channel_code STRING, Flifecycle_loss_rate STRING, Fdistributary_id STRING, Fflow_id STRING, Fabtest_strategy STRING, Fversion STRING, Fcreate_time STRING, Fmodify_time STRING ), headers row ( exec_time string
使用最新 snapshot 版本试试: