alibabacloud-hologres-connectors
alibabacloud-hologres-connectors copied to clipboard
hologres-connector-flink-1.13 是不是有点问题,连接开源flink,可以使holo表插入数据,那边holo也可以查询新增数据,但是从holo那边查询就是一直报错:
Flink SQL> create TEMPORARY table holo_source(
id
INTEGER,item
INTEGER ) with ( 'connector'='hologres', 'dbname'='dps_test', 'tablename'='flink_test', 'username'='', 'password'='', 'endpoint'='hgpostcn-cn-2r42go4bp00c-cn-hangzhou.hologres.aliyuncs.com:80' ); [INFO] Execute statement succeed.
Flink SQL> select id,item from holo_source; [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[]. Missing conversion is FlinkLogicalTableSourceScan[convention: LOGICAL -> BATCH_PHYSICAL] There is 1 empty subset: rel#133:RelSubset#2.BATCH_PHYSICAL.any.[], the relevant part of the original plan is as follows 121:FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, holo_source]], fields=[id, item])
Root: rel#131:RelSubset#3.BATCH_PHYSICAL.any.[] Original rel: FlinkLogicalSink(subset=[rel#119:RelSubset#1.LOGICAL.any.[]], table=[default_catalog.default_database.Unregistered_Collect_Sink_1], fields=[id, item]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 123 FlinkLogicalTableSourceScan(subset=[rel#122:RelSubset#0.LOGICAL.any.[]], table=[[default_catalog, default_database, holo_source]], fields=[id, item]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 8.0E8 io, 0.0 network, 0.0 memory}, id = 121
Sets: Set#2, type: RecordType(INTEGER id, INTEGER item) rel#128:RelSubset#2.LOGICAL.any.[], best=rel#121 rel#121:FlinkLogicalTableSourceScan.LOGICAL.any.[](table=[default_catalog, default_database, holo_source],fields=id, item), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 8.0E8 io, 0.0 network, 0.0 memory} rel#133:RelSubset#2.BATCH_PHYSICAL.any.[], best=null Set#3, type: RecordType(INTEGER id, INTEGER item) rel#130:RelSubset#3.LOGICAL.any.[], best=rel#129 rel#129:FlinkLogicalSink.LOGICAL.any.[](input=RelSubset#128,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=id, item), rowcount=1.0E8, cumulative cost={2.0E8 rows, 2.0E8 cpu, 8.0E8 io, 0.0 network, 0.0 memory} rel#131:RelSubset#3.BATCH_PHYSICAL.any.[], best=null rel#132:AbstractConverter.BATCH_PHYSICAL.any., rowcount=1.0E8, cumulative cost={inf} rel#134:BatchPhysicalSink.BATCH_PHYSICAL.any.[](input=RelSubset#133,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=id, item), rowcount=1.0E8, cumulative cost={inf}
Graphviz: digraph G { root [style=filled,label="Root"]; subgraph cluster2{ label="Set 2 RecordType(INTEGER id, INTEGER item)"; rel121 [label="rel#121:FlinkLogicalTableSourceScan\ntable=[default_catalog, default_database, holo_source],fields=id, item\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 8.0E8 io, 0.0 network, 0.0 memory}",color=blue,shape=box] subset128 [label="rel#128:RelSubset#2.LOGICAL.any.[]"] subset133 [label="rel#133:RelSubset#2.BATCH_PHYSICAL.any.[]",color=red] } subgraph cluster3{ label="Set 3 RecordType(INTEGER id, INTEGER item)"; rel129 [label="rel#129:FlinkLogicalSink\ninput=RelSubset#128,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=id, item\nrows=1.0E8, cost={2.0E8 rows, 2.0E8 cpu, 8.0E8 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel132 [label="rel#132:AbstractConverter\ninput=RelSubset#130,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]\nrows=1.0E8, cost={inf}",shape=box] rel134 [label="rel#134:BatchPhysicalSink\ninput=RelSubset#133,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=id, item\nrows=1.0E8, cost={inf}",shape=box] subset130 [label="rel#130:RelSubset#3.LOGICAL.any.[]"] subset131 [label="rel#131:RelSubset#3.BATCH_PHYSICAL.any.[]"] } root -> subset131; subset128 -> rel121[color=blue]; subset130 -> rel129[color=blue]; rel129 -> subset128[color=blue]; subset131 -> rel132; rel132 -> subset130; subset131 -> rel134; rel134 -> subset133; }
@hecheng64 现在hologres-connector还不支持作为Flink的Source