flinkStreamSQL icon indicating copy to clipboard operation
flinkStreamSQL copied to clipboard

FlinkStreamSQL的insert时不支持union all函数

Open jianlan519588 opened this issue 4 years ago • 0 comments

insert into MyResult

select r1.actionNum as actionNum, r1.IMEI as IMEI, r1.IMEIList as IMEIList, r1.IMEIStatus as IMEIStatus, r1.belongWarehouseId as belongWarehouseId, r1.belongCustomerId as belongCustomerId, r1.brand as brand, r1.product as product, r1.marketing_name as marketing_name, r1.belongCustomer as belongCustomer, r1.belongWarehouse as belongWarehouse from ( ( select '0' as actionNum, wcs.imei as IMEI, wcs.imei_list as IMEIList, 'Available' as IMEIStatus, wcs.warehouse_id as belongWarehouseId, cw.enterprise_id as belongCustomerId, sm.brand as brand, concat(trim(sm.marketing_name), ' ', trim(sm.rom_ram), ' ', trim(sm.color)) as product, sm.marketing_name as marketName, '' as belongCustomer, '' as belongWarehouse from t_channel_warehouse_current_stock as wcs left join t_channel_warehouse as cw on wcs.warehouse_id=cw.id left join t_enter_sap_mat as sm on wcs.material_id =sm.matnr where wcs.source_type in ('6') and wcs.type in ('INSERT') ) union all (
select '1' as actionNum, wcs.imei as IMEI, wcs.imei_list as IMEIList, 'Available' as IMEIStatus, wcs.warehouse_id as belongWarehouseId, cw.enterprise_id as belongCustomerId, sm.brand as brand, concat(trim(sm.marketing_name), ' ', trim(sm.rom_ram), ' ', trim(sm.color)) as product, sm.marketing_name as marketName, sm.rom_ram as rom_ram, sm.color as color, '' as belongCustomer, '' as belongWarehouse from t_channel_warehouse_current_stock as wcs left join t_channel_warehouse as cw on wcs.warehouse_id=cw.id left join t_enter_sap_mat as sm on wcs.material_id =sm.matnr where wcs.source_type in ('6') and wcs.status in ('1') and wcs.type in ('UPDATE') ) ) r1

报错:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Temporary table default_catalog.default_database.t_channel_warehouse_current_stock_t_channel_warehouse_0 already exists at org.apache.flink.table.catalog.CatalogManager.lambda$createTemporaryTable$10(CatalogManager.java:472) at java.util.HashMap.compute(HashMap.java:1197) at org.apache.flink.table.catalog.CatalogManager.createTemporaryTable(CatalogManager.java:470) at org.apache.flink.table.api.internal.TableEnvironmentImpl.createTemporaryView(TableEnvironmentImpl.java:282) at org.apache.flink.table.api.internal.TableEnvironmentImpl.createTemporaryView(TableEnvironmentImpl.java:269) at com.dtstack.flink.sql.side.SideSqlExec.joinFun(SideSqlExec.java:554) at com.dtstack.flink.sql.side.SideSqlExec.exec(SideSqlExec.java:175) at com.dtstack.flink.sql.exec.ExecuteProcessHelper.sqlTranslation(ExecuteProcessHelper.java:232) at com.dtstack.flink.sql.exec.ExecuteProcessHelper.getStreamExecution(ExecuteProcessHelper.java:170) at com.dtstack.flink.sql.Main.main(Main.java:41) at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:140)

jianlan519588 avatar Jan 15 '21 01:01 jianlan519588