flinkStreamSQL icon indicating copy to clipboard operation
flinkStreamSQL copied to clipboard

1.10streamSQLkafka关联mysql维表,出现的问题

Open ruoyuxiangsi opened this issue 4 years ago • 0 comments

我的sql INSERT INTO dwd_business_flow SELECT SUBSTRING(CAST(TIMESTAMPADD(DAY,0,LOCALTIMESTAMP) as varchar),1,10) AS stat_date, CASE WHEN t2.fee_type = 'WAGES' THEN '010' WHEN t2.fee_type = 'RENT' THEN '011' WHEN t2.fee_type = 'DEBIT' THEN '012' WHEN t2.fee_type = 'MEAL' THEN '013' WHEN t2.fee_type = 'REIMBURSE' THEN '015' WHEN t2.fee_type = 'WELFARE' THEN '014' WHEN t2.fee_type = 'RENTBICYLE' THEN '016' WHEN t2.fee_type = 'RENTCLOTHES'THEN '017' WHEN t2.fee_type = 'RENTMOBILE' THEN '019' WHEN t2.fee_type = 'RENTBETTRY' THEN '018' WHEN t2.fee_type = 'RENTCAR' THEN '020' WHEN t2.fee_type = 'LABOUR' THEN '021' WHEN t2.fee_type = 'YEARENDREWARD' THEN '022' WHEN t2.fee_type = 'OTHER' THEN '023' WHEN t2.fee_type = 'SERVICE' THEN '024' ELSE '010' END AS business_type_code, -- 业务类型编号 t2.fee_name AS business_type_name, -- 业务类型名称 t1.order_id AS order_code, -- 订单号 cast(t1.order_detail_no as varchar) AS child_order_code, -- 子订单号 t1.deal_apply_time AS order_create_time, -- 订单创建时间 t1.busi_complete_data AS order_pay_time, -- 订单支付时间 t1.sal_should_pay AS pay_amount, -- 应发金额 t1.sal_real_pay AS pay_amount_real, -- 实发金额 '2' AS trade_state, -- 交易状态(1-未完成,2-已完成) t3.user_code AS user_code, -- 用户编号 t1.user_name, -- 用户名 t1.id_number AS identity_no, -- 身份证号码 t1.mobile, -- 手机号码 t2.corp_no AS company_code, -- 企业编号 t3.company_name AS company_name, -- 企业名称 '1' AS pay_way, -- 支付渠道(1-零钱支付 2-积分支付 3-支付宝支付 4-微信支付) '2' AS trade_type, -- 交易类型(1-个人费用,2-因公费用) '2' AS trade_flow_type, -- 流水类型(1-支出,2-收入) t1.user_info_id AS employee_code, -- 员工主键 t2.relea_month AS sal8_real_month, -- 实际薪酬年月 t1.wallet_acc_id AS trade_account_code, -- 账户编码 case when t4.acct_type = 'DIRECT_RELEASE' then '直发账户' when t4.acct_type = 'WALLET_CUP_NEW' then '好易联账户' when t4.acct_type = 'WALLET_ZSY' then '中顺易账户' when t4.acct_type = 'WALLET_LIMIT' then '额度账户' when t4.acct_type = 'WALLET_FL_PDS' then '华夏PDS账户' else '全部账户' end AS trade_account_name, t1.remark AS remark_, t3.zt_user_code AS user_open_id, -- 用户id t3.zt_employee_code AS staff_open_id, -- 员工id t3.zt_company_code AS company_open_id, -- 企业id t3.company_short_name AS company_short_name FROM (select t1.enable_status, t1.busi_status, t1.busi_complete_data, t1.order_id, t1.wallet_acc_id, t1.id_number, t1.user_info_id, t1.order_detail_no, t1.deal_apply_time, t1.busi_complete_data, t1.sal_should_pay, t1.sal_real_pay, t1.user_name, t1.mobile, t1.remark from to_adw_sal_orders_detail t1 WHERE t1.enable_status = '1' and t1.busi_status = '1' and t1.busi_complete_data >= SUBSTRING(CAST(TIMESTAMPADD(DAY,0,LOCALTIMESTAMP) as varchar),1,10) ) t1 JOIN to_adw_sal_orders t2 ON t1.order_id = t2.busi_proc_id LEFT JOIN (select g.acct_type,g.acct_number from to_adw_sal_orders_detail e left join to_acct_person_account_info g on e.wallet_acc_id = g.acct_number where g.enable_ = '1') t4 ON t1.wallet_acc_id = t4.acct_number LEFT JOIN (select g.client_id, g.identity_no, g.employee_code, g.user_code, g.company_name, g.zt_user_code, g.zt_employee_code, g.zt_company_code, g.company_short_name from to_adw_sal_orders_detail t1 left join dim_zt_user_info g on t1.id_number = g.identity_no where g.client_id = '1ec595e8ba7211e99c7e7cd30ad3a6a8') t3 ON t1.id_number = t3.identity_no and t1.user_info_id = t3.employee_code

报错: Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.calcite.sql.SqlJoin cannot be cast to org.apache.calcite.sql.SqlBasicCall at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79) at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:85) at com.dtstack.flink.sql.launcher.utils.JobGraphBuildUtil.buildJobGraph(JobGraphBuildUtil.java:80) at com.dtstack.flink.sql.launcher.executor.YarnJobClusterExecutor.exec(YarnJobClusterExecutor.java:65) at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:144) Caused by: java.lang.ClassCastException: org.apache.calcite.sql.SqlJoin cannot be cast to org.apache.calcite.sql.SqlBasicCall at com.dtstack.flink.sql.side.JoinNodeDealer.dealJoinNode(JoinNodeDealer.java:198) at com.dtstack.flink.sql.side.SideSQLParser.parseSql(SideSQLParser.java:164) at com.dtstack.flink.sql.side.SideSQLParser.parseSql(SideSQLParser.java:144) at com.dtstack.flink.sql.side.SideSQLParser.parseSql(SideSQLParser.java:137) at com.dtstack.flink.sql.side.SideSQLParser.getExeQueue(SideSQLParser.java:72) at com.dtstack.flink.sql.side.SideSqlExec.exec(SideSqlExec.java:122) at com.dtstack.flink.sql.exec.ExecuteProcessHelper.sqlTranslation(ExecuteProcessHelper.java:223) at com.dtstack.flink.sql.exec.ExecuteProcessHelper.getStreamExecution(ExecuteProcessHelper.java:165) at com.dtstack.flink.sql.Main.main(Main.java:42) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) ... 8 more

ruoyuxiangsi avatar May 19 '20 02:05 ruoyuxiangsi