risingwave
risingwave copied to clipboard
perf: improve tpc-h q20 performance (single-topic)
See performance numbers at https://www.notion.so/risingwave-labs/TPCH-Performance-Numbers-Table-e098ef82884546949333409f0513ada7?pvs=4#8de0bf4bda51444c8381f3b0c10ddfe1
Query:
create sink tpch_q20 as
select
s_name,
s_address
from
supplier,
nation
where
s_suppkey in (
select
ps_suppkey
from
partsupp
where
ps_partkey in (
select
p_partkey
from
part
)
and ps_availqty > (
select
0.005 * sum(l_quantity)
from
lineitem
where
l_partkey = ps_partkey
and l_suppkey = ps_suppkey
)
)
and s_nationkey = n_nationkey
--order by
-- s_name
with ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
Plan:
StreamSink { type: append-only, columns: [s_name, s_address, _row_id(hidden), _row_id#1(hidden), $expr10490(hidden), $expr10487(hidden)] }
└─StreamHashJoin { type: LeftSemi, predicate: $expr1 = $expr7 }
├─StreamExchange { dist: HashShard($expr1) }
│ └─StreamHashJoin [append_only] { type: Inner, predicate: $expr4 = $expr5 }
│ ├─StreamExchange { dist: HashShard($expr4) }
│ │ └─StreamProject { exprs: [Field(supplier, 0:Int32) as $expr1, Field(supplier, 1:Int32) as $expr2, Field(supplier, 2:Int32) as $expr3, Field(supplier, 3:Int32) as $expr4, _row_id] }
│ │ └─StreamFilter { predicate: (eventType = 'supplier':Varchar) }
│ │ └─StreamShare { id: 4 }
│ │ └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
│ │ └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
│ │ └─StreamRowIdGen { row_id_index: 10 }
│ │ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
│ └─StreamExchange { dist: HashShard($expr5) }
│ └─StreamProject { exprs: [Field(nation, 0:Int32) as $expr5, _row_id] }
│ └─StreamFilter { predicate: (eventType = 'nation':Varchar) }
│ └─StreamShare { id: 4 }
│ └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
│ └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
│ └─StreamRowIdGen { row_id_index: 10 }
│ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
└─StreamExchange { dist: HashShard($expr7) }
└─StreamProject { exprs: [$expr7, _row_id, $expr6, $expr6, $expr7] }
└─StreamFilter { predicate: ($expr10 > $expr14) }
└─StreamHashJoin { type: Inner, predicate: $expr6 IS NOT DISTINCT FROM $expr6 AND $expr7 IS NOT DISTINCT FROM $expr7 }
├─StreamExchange { dist: HashShard($expr6, $expr7) }
│ └─StreamProject { exprs: [$expr6, $expr7, $expr8::Decimal as $expr10, _row_id] }
│ └─StreamShare { id: 20 }
│ └─StreamHashJoin { type: LeftSemi, predicate: $expr6 = $expr9 }
│ ├─StreamExchange { dist: HashShard($expr6) }
│ │ └─StreamProject { exprs: [Field(partsupp, 0:Int32) as $expr6, Field(partsupp, 1:Int32) as $expr7, Field(partsupp, 2:Int32) as $expr8, _row_id] }
│ │ └─StreamFilter { predicate: (eventType = 'partsupp':Varchar) }
│ │ └─StreamShare { id: 4 }
│ │ └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
│ │ └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
│ │ └─StreamRowIdGen { row_id_index: 10 }
│ │ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
│ └─StreamExchange { dist: HashShard($expr9) }
│ └─StreamProject { exprs: [Field(part, 0:Int32) as $expr9, _row_id] }
│ └─StreamFilter { predicate: (eventType = 'part':Varchar) }
│ └─StreamShare { id: 4 }
│ └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
│ └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
│ └─StreamRowIdGen { row_id_index: 10 }
│ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
└─StreamProject { exprs: [$expr6, $expr7, (0.005:Decimal * sum($expr13)) as $expr14] }
└─StreamHashAgg { group_key: [$expr6, $expr7], aggs: [sum($expr13), count] }
└─StreamHashJoin { type: LeftOuter, predicate: $expr6 IS NOT DISTINCT FROM $expr6 AND $expr7 IS NOT DISTINCT FROM $expr7 }
├─StreamExchange { dist: HashShard($expr6, $expr7) }
│ └─StreamProject { exprs: [$expr6, $expr7] }
│ └─StreamHashAgg { group_key: [$expr6, $expr7], aggs: [count] }
│ └─StreamShare { id: 20 }
│ └─StreamHashJoin { type: LeftSemi, predicate: $expr6 = $expr9 }
│ ├─StreamExchange { dist: HashShard($expr6) }
│ │ └─StreamProject { exprs: [Field(partsupp, 0:Int32) as $expr6, Field(partsupp, 1:Int32) as $expr7, Field(partsupp, 2:Int32) as $expr8, _row_id] }
│ │ └─StreamFilter { predicate: (eventType = 'partsupp':Varchar) }
│ │ └─StreamShare { id: 4 }
│ │ └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
│ │ └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
│ │ └─StreamRowIdGen { row_id_index: 10 }
│ │ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
│ └─StreamExchange { dist: HashShard($expr9) }
│ └─StreamProject { exprs: [Field(part, 0:Int32) as $expr9, _row_id] }
│ └─StreamFilter { predicate: (eventType = 'part':Varchar) }
│ └─StreamShare { id: 4 }
│ └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
│ └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
│ └─StreamRowIdGen { row_id_index: 10 }
│ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
└─StreamProject { exprs: [$expr6, $expr7, Field(lineitem, 4:Int32) as $expr13, _row_id] }
└─StreamHashJoin { type: Inner, predicate: $expr6 = $expr11 AND $expr7 = $expr12 }
├─StreamExchange { dist: HashShard($expr6, $expr7) }
│ └─StreamProject { exprs: [$expr6, $expr7] }
│ └─StreamHashAgg { group_key: [$expr6, $expr7], aggs: [count] }
│ └─StreamShare { id: 20 }
│ └─StreamHashJoin { type: LeftSemi, predicate: $expr6 = $expr9 }
│ ├─StreamExchange { dist: HashShard($expr6) }
│ │ └─StreamProject { exprs: [Field(partsupp, 0:Int32) as $expr6, Field(partsupp, 1:Int32) as $expr7, Field(partsupp, 2:Int32) as $expr8, _row_id] }
│ │ └─StreamFilter { predicate: (eventType = 'partsupp':Varchar) }
│ │ └─StreamShare { id: 4 }
│ │ └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
│ │ └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
│ │ └─StreamRowIdGen { row_id_index: 10 }
│ │ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
│ └─StreamExchange { dist: HashShard($expr9) }
│ └─StreamProject { exprs: [Field(part, 0:Int32) as $expr9, _row_id] }
│ └─StreamFilter { predicate: (eventType = 'part':Varchar) }
│ └─StreamShare { id: 4 }
│ └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
│ └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
│ └─StreamRowIdGen { row_id_index: 10 }
│ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
└─StreamExchange { dist: HashShard($expr11, $expr12) }
└─StreamProject { exprs: [lineitem, Field(lineitem, 1:Int32) as $expr11, Field(lineitem, 2:Int32) as $expr12, _row_id] }
└─StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
└─StreamShare { id: 4 }
└─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
└─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
└─StreamRowIdGen { row_id_index: 10 }
└─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
(99 rows)
Dist Plan:
Fragment 0
StreamSink { type: append-only, columns: [s_name, s_address, _row_id(hidden), _row_id#1(hidden), $expr10490(hidden), $expr10487(hidden)] }
├── tables: [ Sink: 0 ]
└── StreamHashJoin { type: LeftSemi, predicate: $expr1 = $expr7 }
├── tables: [ HashJoinLeft: 1, HashJoinDegreeLeft: 2, HashJoinRight: 3, HashJoinDegreeRight: 4 ]
├── StreamExchange Hash([0]) from 1
└── StreamExchange Hash([0]) from 5
Fragment 1
StreamHashJoin [append_only] { type: Inner, predicate: $expr4 = $expr5 }
├── tables: [ HashJoinLeft: 5, HashJoinDegreeLeft: 6, HashJoinRight: 7, HashJoinDegreeRight: 8 ]
├── StreamExchange Hash([3]) from 2
└── StreamExchange Hash([0]) from 4
Fragment 2
StreamProject { exprs: [Field(supplier, 0:Int32) as $expr1, Field(supplier, 1:Int32) as $expr2, Field(supplier, 2:Int32) as $expr3, Field(supplier, 3:Int32) as $expr4, _row_id] }
└── StreamFilter { predicate: (eventType = 'supplier':Varchar) }
└── StreamExchange NoShuffle from 3
Fragment 3
StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
└── StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
└── StreamRowIdGen { row_id_index: 10 }
└── StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] } { tables: [ Source: 9 ] }
Fragment 4
StreamProject { exprs: [Field(nation, 0:Int32) as $expr5, _row_id] }
└── StreamFilter { predicate: (eventType = 'nation':Varchar) }
└── StreamExchange NoShuffle from 3
Fragment 5
StreamProject { exprs: [$expr7, _row_id, $expr6, $expr6, $expr7] }
└── StreamFilter { predicate: ($expr10 > $expr14) }
└── StreamHashJoin { type: Inner, predicate: $expr6 IS NOT DISTINCT FROM $expr6 AND $expr7 IS NOT DISTINCT FROM $expr7 }
├── tables: [ HashJoinLeft: 10, HashJoinDegreeLeft: 11, HashJoinRight: 12, HashJoinDegreeRight: 13 ]
├── StreamExchange Hash([0, 1]) from 6
└── StreamProject { exprs: [$expr6, $expr7, (0.005:Decimal * sum($expr13)) as $expr14] }
└── StreamHashAgg { group_key: [$expr6, $expr7], aggs: [sum($expr13), count] } { tables: [ HashAggState: 18 ] }
└── StreamHashJoin { type: LeftOuter, predicate: $expr6 IS NOT DISTINCT FROM $expr6 AND $expr7 IS NOT DISTINCT FROM $expr7 }
├── tables: [ HashJoinLeft: 19, HashJoinDegreeLeft: 20, HashJoinRight: 21, HashJoinDegreeRight: 22 ]
├── StreamExchange Hash([0, 1]) from 10
└── StreamProject { exprs: [$expr6, $expr7, Field(lineitem, 4:Int32) as $expr13, _row_id] }
└── StreamHashJoin { type: Inner, predicate: $expr6 = $expr11 AND $expr7 = $expr12 } { tables: [ HashJoinLeft: 24, HashJoinDegreeLeft: 25, HashJoinRight: 26, HashJoinDegreeRight: 27 ] }
├── StreamExchange Hash([0, 1]) from 11
└── StreamExchange Hash([1, 2]) from 12
Fragment 6
StreamProject { exprs: [$expr6, $expr7, $expr8::Decimal as $expr10, _row_id] }
└── StreamExchange NoShuffle from 7
Fragment 7
StreamHashJoin { type: LeftSemi, predicate: $expr6 = $expr9 } { tables: [ HashJoinLeft: 14, HashJoinDegreeLeft: 15, HashJoinRight: 16, HashJoinDegreeRight: 17 ] }
├── StreamExchange Hash([0]) from 8
└── StreamExchange Hash([0]) from 9
Fragment 8
StreamProject { exprs: [Field(partsupp, 0:Int32) as $expr6, Field(partsupp, 1:Int32) as $expr7, Field(partsupp, 2:Int32) as $expr8, _row_id] }
└── StreamFilter { predicate: (eventType = 'partsupp':Varchar) }
└── StreamExchange NoShuffle from 3
Fragment 9
StreamProject { exprs: [Field(part, 0:Int32) as $expr9, _row_id] }
└── StreamFilter { predicate: (eventType = 'part':Varchar) }
└── StreamExchange NoShuffle from 3
Fragment 10
StreamProject { exprs: [$expr6, $expr7] }
└── StreamHashAgg { group_key: [$expr6, $expr7], aggs: [count] } { tables: [ HashAggState: 23 ] }
└── StreamExchange NoShuffle from 7
Fragment 11
StreamProject { exprs: [$expr6, $expr7] }
└── StreamHashAgg { group_key: [$expr6, $expr7], aggs: [count] } { tables: [ HashAggState: 28 ] }
└── StreamExchange NoShuffle from 7
Fragment 12
StreamProject { exprs: [lineitem, Field(lineitem, 1:Int32) as $expr11, Field(lineitem, 2:Int32) as $expr12, _row_id] }
└── StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
└── StreamExchange NoShuffle from 3
Table 0
├── columns: [ kv_log_store_epoch, kv_log_store_seq_id, kv_log_store_row_op, s_name, s_address, _row_id, _row_id#1, $expr10490, $expr10487 ]
├── primary key: [ $0 ASC, $1 ASC ]
├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8 ]
├── distribution key: [ 8 ]
└── read pk prefix len hint: 2
Table 1
├── columns: [ $expr1, $expr2, $expr3, _row_id, $expr4, _row_id_0 ]
├── primary key: [ $0 ASC, $3 ASC, $5 ASC, $4 ASC ]
├── value indices: [ 0, 1, 2, 3, 4, 5 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 2 { columns: [ $expr1, _row_id, _row_id_0, $expr4, _degree ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC ], value indices: [ 4 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 3 { columns: [ $expr7, _row_id, $expr6, $expr6_0, $expr7_0 ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 0, 1, 2, 3, 4 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 4 { columns: [ $expr7, _row_id, $expr6, _degree ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 5 { columns: [ $expr1, $expr2, $expr3, $expr4, _row_id ], primary key: [ $3 ASC, $4 ASC ], value indices: [ 0, 1, 2, 3, 4 ], distribution key: [ 3 ], read pk prefix len hint: 1 }
Table 6 { columns: [ $expr4, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 7 { columns: [ $expr5, _row_id ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 8 { columns: [ $expr5, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 9 { columns: [ partition_id, offset_info ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 1 }
Table 10 { columns: [ $expr6, $expr7, $expr10, _row_id ], primary key: [ $0 ASC, $1 ASC, $3 ASC ], value indices: [ 0, 1, 2, 3 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
Table 11 { columns: [ $expr6, $expr7, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
Table 12 { columns: [ $expr6, $expr7, $expr14 ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
Table 13 { columns: [ $expr6, $expr7, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
Table 14 { columns: [ $expr6, $expr7, $expr8, _row_id ], primary key: [ $0 ASC, $3 ASC ], value indices: [ 0, 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 15 { columns: [ $expr6, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 16 { columns: [ $expr9, _row_id ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 17 { columns: [ $expr9, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 18 { columns: [ $expr6, $expr7, sum($expr13), count ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2, 3 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
Table 19 { columns: [ $expr6, $expr7 ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
Table 20 { columns: [ $expr6, $expr7, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
Table 21 { columns: [ $expr6, $expr7, $expr13, _row_id ], primary key: [ $0 ASC, $1 ASC, $3 ASC ], value indices: [ 0, 1, 2, 3 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
Table 22 { columns: [ $expr6, $expr7, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
Table 23 { columns: [ $expr6, $expr7, count ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 2 }
Table 24 { columns: [ $expr6, $expr7 ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
Table 25 { columns: [ $expr6, $expr7, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
Table 26 { columns: [ lineitem, $expr11, $expr12, _row_id ], primary key: [ $1 ASC, $2 ASC, $3 ASC ], value indices: [ 0, 1, 2, 3 ], distribution key: [ 1, 2 ], read pk prefix len hint: 2 }
Table 27 { columns: [ $expr11, $expr12, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
Table 28 { columns: [ $expr6, $expr7, count ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 2 }
(148 rows)
Flink:
INSERT INTO tpch_q20
select
s_name,
s_address
from
supplier,
nation
where
s_suppkey in (
select
ps_suppkey
from
partsupp
where
ps_partkey in (
select
p_partkey
from
part
)
and ps_availqty > (
select
0.005 * sum(l_quantity)
from
lineitem
where
l_partkey = ps_partkey
and l_suppkey = ps_suppkey
)
)
and s_nationkey = n_nationkey;
Plan:
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.tpch_q20], fields=[s_name, s_address])
+- Calc(select=[s_name, s_address])
+- Join(joinType=[InnerJoin], where=[=(s_nationkey, n_nationkey)], select=[s_name, s_address, s_nationkey, n_nationkey], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[s_nationkey]])
: +- Calc(select=[s_name, s_address, s_nationkey])
: +- Join(joinType=[LeftSemiJoin], where=[=(s_suppkey, ps_suppkey)], select=[s_suppkey, s_name, s_address, s_nationkey], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
: :- Exchange(distribution=[hash[s_suppkey]])
: : +- Calc(select=[supplier.s_suppkey AS s_suppkey, supplier.s_name AS s_name, supplier.s_address AS s_address, supplier.s_nationkey AS s_nationkey], where=[=(eventType, _UTF-16LE'supplier':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")])
: : +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])
: +- Exchange(distribution=[hash[ps_suppkey]])
: +- Calc(select=[ps_suppkey])
: +- Join(joinType=[InnerJoin], where=[AND(=(ps_partkey, l_partkey), =(ps_suppkey, l_suppkey), >(ps_availqty, *(0.005:DECIMAL(4, 3), $f2)))], select=[ps_partkey, ps_suppkey, ps_availqty, l_partkey, l_suppkey, $f2], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
: :- Exchange(distribution=[hash[ps_partkey, ps_suppkey]])
: : +- Join(joinType=[LeftSemiJoin], where=[=(ps_partkey, p_partkey)], select=[ps_partkey, ps_suppkey, ps_availqty], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
: : :- Exchange(distribution=[hash[ps_partkey]])
: : : +- Calc(select=[partsupp.ps_partkey AS ps_partkey, partsupp.ps_suppkey AS ps_suppkey, partsupp.ps_availqty AS ps_availqty], where=[=(eventType, _UTF-16LE'partsupp':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")])
: : : +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])
: : +- Exchange(distribution=[hash[p_partkey]])
: : +- Calc(select=[part.p_partkey AS p_partkey], where=[=(eventType, _UTF-16LE'part':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")])
: : +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])
: +- Exchange(distribution=[hash[l_partkey, l_suppkey]])
: +- GroupAggregate(groupBy=[l_partkey, l_suppkey], select=[l_partkey, l_suppkey, SUM(l_quantity) AS $f2])
: +- Exchange(distribution=[hash[l_partkey, l_suppkey]])
: +- Calc(select=[lineitem.l_partkey AS l_partkey, lineitem.l_suppkey AS l_suppkey, lineitem.l_quantity AS l_quantity], where=[AND(=(eventType, _UTF-16LE'lineitem':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), IS NOT NULL(lineitem.l_partkey), IS NOT NULL(lineitem.l_suppkey))])
: +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])
+- Exchange(distribution=[hash[n_nationkey]])
+- Calc(select=[nation.n_nationkey AS n_nationkey], where=[=(eventType, _UTF-16LE'nation':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")])
+- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.tpch_q20], fields=[s_name, s_address])
+- Calc(select=[s_name, s_address])
+- Join(joinType=[InnerJoin], where=[(s_nationkey = n_nationkey)], select=[s_name, s_address, s_nationkey, n_nationkey], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[s_nationkey]])
: +- Calc(select=[s_name, s_address, s_nationkey])
: +- Join(joinType=[LeftSemiJoin], where=[(s_suppkey = ps_suppkey)], select=[s_suppkey, s_name, s_address, s_nationkey], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
: :- Exchange(distribution=[hash[s_suppkey]])
: : +- Calc(select=[supplier.s_suppkey AS s_suppkey, supplier.s_name AS s_name, supplier.s_address AS s_address, supplier.s_nationkey AS s_nationkey], where=[(eventType = 'supplier')])
: : +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])(reuse_id=[1])
: +- Exchange(distribution=[hash[ps_suppkey]])
: +- Calc(select=[ps_suppkey])
: +- Join(joinType=[InnerJoin], where=[((ps_partkey = l_partkey) AND (ps_suppkey = l_suppkey) AND (ps_availqty > (0.005 * $f2)))], select=[ps_partkey, ps_suppkey, ps_availqty, l_partkey, l_suppkey, $f2], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
: :- Exchange(distribution=[hash[ps_partkey, ps_suppkey]])
: : +- Join(joinType=[LeftSemiJoin], where=[(ps_partkey = p_partkey)], select=[ps_partkey, ps_suppkey, ps_availqty], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
: : :- Exchange(distribution=[hash[ps_partkey]])
: : : +- Calc(select=[partsupp.ps_partkey AS ps_partkey, partsupp.ps_suppkey AS ps_suppkey, partsupp.ps_availqty AS ps_availqty], where=[(eventType = 'partsupp')])
: : : +- Reused(reference_id=[1])
: : +- Exchange(distribution=[hash[p_partkey]])
: : +- Calc(select=[part.p_partkey AS p_partkey], where=[(eventType = 'part')])
: : +- Reused(reference_id=[1])
: +- Exchange(distribution=[hash[l_partkey, l_suppkey]])
: +- GroupAggregate(groupBy=[l_partkey, l_suppkey], select=[l_partkey, l_suppkey, SUM(l_quantity) AS $f2])
: +- Exchange(distribution=[hash[l_partkey, l_suppkey]])
: +- Calc(select=[lineitem.l_partkey AS l_partkey, lineitem.l_suppkey AS l_suppkey, lineitem.l_quantity AS l_quantity], where=[((eventType = 'lineitem') AND lineitem.l_partkey IS NOT NULL AND lineitem.l_suppkey IS NOT NULL)])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[n_nationkey]])
+- Calc(select=[nation.n_nationkey AS n_nationkey], where=[(eventType = 'nation')])
+- Reused(reference_id=[1])
We notice that
there are 8 StreamHashJoin
in Risingwave's query plan
even after we discount the 3 StreamHashJoin
that is shared by StreamShare { id: 20 }
,
we still got 8 - (3-1) = 6 StreamHashJoin
:
3 InnerJoin
2 LeftSemi
1 LeftOuter
while there are only 4 Join
in Flink's query plan:
2 InnerJoin
2 LeftSemiJoin
.
What is the source you use? I didn't see more than 4 join in our planner tests. https://github.com/risingwavelabs/risingwave/blob/dad438783aafa2f942d8882056c437b9e98c233f/src/frontend/planner_test/tests/testdata/output/tpch.yaml
https://github.com/risingwavelabs/kube-bench/blob/main/manifests/tpch/tpch-kafka-sources.template.yaml#L295
This unified one
There are 2 issues here:
- Our subquery
TranslateApplyRule
has some special optimization on scan instead of source, so in the unified case, it would generate more join than the table scan test. - Even if we use a table instead of source, we still have 5 Join instead of 4 because, for the general case, our subquery unnesting rewrite could transform a correlated apply to 2 join.
now the new RW plan is:
StreamSink { type: append-only, columns: [s_name, s_address, _row_id(hidden), _row_id#1(hidden), $expr10423(hidden), $expr10420(hidden)] }
└─StreamHashJoin { type: LeftSemi, predicate: $expr1 = $expr7 }
├─StreamExchange { dist: HashShard($expr1) }
│ └─StreamHashJoin [append_only] { type: Inner, predicate: $expr4 = $expr5 }
│ ├─StreamExchange { dist: HashShard($expr4) }
│ │ └─StreamProject { exprs: [Field(supplier, 0:Int32) as $expr1, Field(supplier, 1:Int32) as $expr2, Field(supplier, 2:Int32) as $expr3, Field(supplier, 3:Int32) as $expr4, _row_id] }
│ │ └─StreamFilter { predicate: (eventType = 'supplier':Varchar) }
│ │ └─StreamShare { id: 4 }
│ │ └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
│ │ └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
│ │ └─StreamRowIdGen { row_id_index: 10 }
│ │ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
│ └─StreamExchange { dist: HashShard($expr5) }
│ └─StreamProject { exprs: [Field(nation, 0:Int32) as $expr5, _row_id] }
│ └─StreamFilter { predicate: (eventType = 'nation':Varchar) }
│ └─StreamShare { id: 4 }
│ └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
│ └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
│ └─StreamRowIdGen { row_id_index: 10 }
│ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
└─StreamExchange { dist: HashShard($expr7) }
└─StreamProject { exprs: [$expr7, _row_id, $expr6, $expr12, $expr13] }
└─StreamFilter { predicate: ($expr10 > $expr14) }
└─StreamHashJoin { type: Inner, predicate: $expr6 = $expr12 AND $expr7 = $expr13 }
├─StreamExchange { dist: HashShard($expr6, $expr7) }
│ └─StreamProject { exprs: [$expr6, $expr7, $expr8::Decimal as $expr10, _row_id] }
│ └─StreamHashJoin { type: LeftSemi, predicate: $expr6 = $expr9 }
│ ├─StreamExchange { dist: HashShard($expr6) }
│ │ └─StreamProject { exprs: [Field(partsupp, 0:Int32) as $expr6, Field(partsupp, 1:Int32) as $expr7, Field(partsupp, 2:Int32) as $expr8, _row_id] }
│ │ └─StreamFilter { predicate: (eventType = 'partsupp':Varchar) }
│ │ └─StreamShare { id: 4 }
│ │ └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
│ │ └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
│ │ └─StreamRowIdGen { row_id_index: 10 }
│ │ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
│ └─StreamExchange { dist: HashShard($expr9) }
│ └─StreamProject { exprs: [Field(part, 0:Int32) as $expr9, _row_id] }
│ └─StreamFilter { predicate: (eventType = 'part':Varchar) }
│ └─StreamShare { id: 4 }
│ └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
│ └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
│ └─StreamRowIdGen { row_id_index: 10 }
│ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
└─StreamProject { exprs: [(0.005:Decimal * sum($expr11)) as $expr14, $expr12, $expr13] }
└─StreamHashAgg [append_only] { group_key: [$expr12, $expr13], aggs: [sum($expr11), count] }
└─StreamExchange { dist: HashShard($expr12, $expr13) }
└─StreamProject { exprs: [Field(lineitem, 4:Int32) as $expr11, Field(lineitem, 1:Int32) as $expr12, Field(lineitem, 2:Int32) as $expr13, _row_id] }
└─StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
└─StreamShare { id: 4 }
└─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
└─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
└─StreamRowIdGen { row_id_index: 10 }
└─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
(53 rows)
The numbers of join
in RW and Flink's query plan are the same now.
The one remaining difference, not sure if better or worse, is that the RW's plan is bushy while Flink's plan is one side deep.
RW's performance is better than before, but still has BIG
room to improve as we can see from:
- https://www.notion.so/risingwave-labs/TPCH-Performance-Numbers-Table-e098ef82884546949333409f0513ada7?pvs=4#8de0bf4bda51444c8381f3b0c10ddfe1
- http://metabase.risingwave-cloud.xyz/question/5354-tpch-q20-bs-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-383?start_date=2023-11-25
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1707614378000&to=1707616182000&var-namespace=tpch-1cn-affinity-weekly-20240210:
one data block miss ops seems very high, ~140ops/s
join executor barrier align
and join actor input blocking time ratio
seem bad
@xxchan could you help take a look?
link #14811 as both q20 and q4 have LeftSemiJoin
in it, q20 have two
and LeftSemiJoin
is shown to be the bottleneck in q4.
https://buildkite.com/risingwave-test/tpch-benchmark/builds/991
using nightly-20240217
instead of nightly-20240127
because we need the new query plan of q20
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708218697000&to=1708220501000&var-namespace=tpch-1cn-affinity-weekly-20240217
it seems that L0 looks a lot like tpch q4: https://github.com/risingwavelabs/risingwave/issues/14811#issuecomment-1956527135
https://buildkite.com/risingwave-test/tpch-benchmark/builds/991
using
nightly-20240217
instead ofnightly-20240127
because we need the new query plan of q20https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708218697000&to=1708220501000&var-namespace=tpch-1cn-affinity-weekly-20240217
it seems that L0 looks a lot like tpch q4: #14811 (comment)
Analyzed the information given by Grafana
-
task fail
-
object store timeout
Simple conclusion: task pending due to io timeout. Our default 8-minute timeout had a big impact on this short test.
http://metabase.risingwave-cloud.xyz/question/5354-tpch-q20-bs-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-383?start_date=2023-11-25
The other one, Feb-11, seems to be a success without this timeout error: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1707614378000&to=1707616182000&var-namespace=tpch-1cn-affinity-weekly-20240210
The throughput is a bit higher, 3% higher than the one with timeout error
I suppose the error does not affect the throughput much
Since Q20 is a pretty complex query, we try to remove some parts of the query to reveal the true bottleneck.
Therefore, we introduced three variants of Q20, please check https://github.com/risingwavelabs/kube-bench/blob/main/manifests/tpch/tpch-modified-sinks.template.yaml#L830-L905 q20-no-greater q20-no-greater-inner-in q20-only-greater
Q20-NO-GREATER
Query:
create sink tpch_q20_no_greater as
select
s_name,
s_address
from
supplier,
nation
where
s_suppkey in (
select
ps_suppkey
from
partsupp
where
ps_partkey in (
select
p_partkey
from
part
)
)
and s_nationkey = n_nationkey
with ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
Plan:
StreamSink { type: append-only, columns: [s_name, s_address, _row_id(hidden), _row_id#1(hidden), $expr10248(hidden), $expr10245(hidden)] }
└─StreamHashJoin { type: LeftSemi, predicate: $expr1 = $expr7 }
├─StreamExchange { dist: HashShard($expr1) }
│ └─StreamHashJoin [append_only] { type: Inner, predicate: $expr4 = $expr5 }
│ ├─StreamExchange { dist: HashShard($expr4) }
│ │ └─StreamProject { exprs: [Field(supplier, 0:Int32) as $expr1, Field(supplier, 1:Int32) as $expr2, Field(supplier, 2:Int32) as $expr3, Field(supplier, 3:Int32) as $expr4, _row_id] }
│ │ └─StreamFilter { predicate: (eventType = 'supplier':Varchar) }
│ │ └─StreamShare { id: 4 }
│ │ └─StreamProject { exprs: [eventType, supplier, part, partsupp, nation, _row_id] }
│ │ └─StreamFilter { predicate: ((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) }
│ │ └─StreamRowIdGen { row_id_index: 10 }
│ │ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
│ └─StreamExchange { dist: HashShard($expr5) }
│ └─StreamProject { exprs: [Field(nation, 0:Int32) as $expr5, _row_id] }
│ └─StreamFilter { predicate: (eventType = 'nation':Varchar) }
│ └─StreamShare { id: 4 }
│ └─StreamProject { exprs: [eventType, supplier, part, partsupp, nation, _row_id] }
│ └─StreamFilter { predicate: ((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) }
│ └─StreamRowIdGen { row_id_index: 10 }
│ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
└─StreamExchange { dist: HashShard($expr7) }
└─StreamHashJoin { type: LeftSemi, predicate: $expr6 = $expr8 }
├─StreamExchange { dist: HashShard($expr6) }
│ └─StreamProject { exprs: [Field(partsupp, 0:Int32) as $expr6, Field(partsupp, 1:Int32) as $expr7, _row_id] }
│ └─StreamFilter { predicate: (eventType = 'partsupp':Varchar) }
│ └─StreamShare { id: 4 }
│ └─StreamProject { exprs: [eventType, supplier, part, partsupp, nation, _row_id] }
│ └─StreamFilter { predicate: ((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) }
│ └─StreamRowIdGen { row_id_index: 10 }
│ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
└─StreamExchange { dist: HashShard($expr8) }
└─StreamProject { exprs: [Field(part, 0:Int32) as $expr8, _row_id] }
└─StreamFilter { predicate: (eventType = 'part':Varchar) }
└─StreamShare { id: 4 }
└─StreamProject { exprs: [eventType, supplier, part, partsupp, nation, _row_id] }
└─StreamFilter { predicate: ((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) }
└─StreamRowIdGen { row_id_index: 10 }
└─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
(38 rows)
RW: http://metabase.risingwave-cloud.xyz/question/12860-tpch-q20-no-greater-bs-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-3111?start_date=2024-01-24
Flink: http://metabase.risingwave-cloud.xyz/question/12946-flink-tpch-q20-no-greater-flink-medium-1tm-avg-job-throughput-per-second-records-s-history-thtb-3108?start_date=2024-01-24
q20-no-greater is much much better than q20 on both systems. The improvement is more than 4 times for RW.
Therefore, we can conclude that this removed part is likely to be the bottleneck.
ps_availqty > (
select
0.005 * sum(l_quantity)
from
lineitem
where
l_partkey = ps_partkey
and l_suppkey = ps_suppkey
)
Therefore, let’s look at q20-only-greater
.
Q20-ONLY-GREATER
Query:
create sink tpch_q20_only_greater as
select
ps_suppkey
from
partsupp
where
ps_availqty > (
select
0.005 * sum(l_quantity)
from
lineitem
where
l_partkey = ps_partkey
and l_suppkey = ps_suppkey
)
with ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
Plan:
StreamSink { type: append-only, columns: [ps_suppkey, _row_id(hidden), $expr10228(hidden), $expr10236(hidden), $expr10237(hidden)] }
└─StreamProject { exprs: [$expr2, _row_id, $expr1, $expr5, $expr6] }
└─StreamFilter { predicate: ($expr3 > $expr7) }
└─StreamHashJoin { type: Inner, predicate: $expr1 = $expr5 AND $expr2 = $expr6 }
├─StreamExchange { dist: HashShard($expr1, $expr2) }
│ └─StreamProject { exprs: [Field(partsupp, 0:Int32) as $expr1, Field(partsupp, 1:Int32) as $expr2, Field(partsupp, 2:Int32)::Decimal as $expr3, _row_id] }
│ └─StreamFilter { predicate: (eventType = 'partsupp':Varchar) }
│ └─StreamShare { id: 4 }
│ └─StreamProject { exprs: [eventType, lineitem, partsupp, _row_id] }
│ └─StreamFilter { predicate: ((eventType = 'partsupp':Varchar) OR (eventType = 'lineitem':Varchar)) }
│ └─StreamRowIdGen { row_id_index: 10 }
│ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
└─StreamProject { exprs: [(0.005:Decimal * sum($expr4)) as $expr7, $expr5, $expr6] }
└─StreamHashAgg [append_only] { group_key: [$expr5, $expr6], aggs: [sum($expr4), count] }
└─StreamExchange { dist: HashShard($expr5, $expr6) }
└─StreamProject { exprs: [Field(lineitem, 4:Int32) as $expr4, Field(lineitem, 1:Int32) as $expr5, Field(lineitem, 2:Int32) as $expr6, _row_id] }
└─StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
└─StreamShare { id: 4 }
└─StreamProject { exprs: [eventType, lineitem, partsupp, _row_id] }
└─StreamFilter { predicate: ((eventType = 'partsupp':Varchar) OR (eventType = 'lineitem':Varchar)) }
└─StreamRowIdGen { row_id_index: 10 }
└─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
(22 rows)
RW: http://metabase.risingwave-cloud.xyz/question/13316-tpch-q20-only-greater-bs-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-3123?start_date=2024-01-26
Flink: http://metabase.risingwave-cloud.xyz/question/13306-flink-tpch-q20-only-greater-flink-medium-1tm-avg-job-throughput-per-second-records-s-history-thtb-3120?start_date=2024-01-26
It confirms that the removed part, aka q20-only-greater
is the bottleneck.
The barrier interval does not matter a lot for RW. And we are using the nightly-20240224
image here.
suspect that the memory size, aka cache size, is the bottleneck for q20
and q20-only-greater
.
Try to verify the throughput on a 8c32G machine, in contrast to the current 8c16G machine.
suspect that the memory size, aka cache size, is the bottleneck for
q20
andq20-only-greater
. Try to verify the throughput on a 8c32G machine, in contrast to the current 8c16G machine.
run it once: http://metabase.risingwave-cloud.xyz/question/13724-tpch-q20-only-greater-bs-8c32g-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-3146?start_date=2024-01-28
https://buildkite.com/risingwave-test/tpch-benchmark/builds/1009
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1709028167000&to=1709029750000&var-namespace=tpch-8c32g-1cn-affinity-test
Sadly we didn't adjust the setting in kube-bench, e.g. set the memory request
and limit
of compute node to 32G, but the memory limit
is adjusted to 15GB instead of 13GB previously when still using 8c16GB machine.
And the effect is huge.
Previously: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708882589000&to=1708884401000&var-namespace=tpch-1cn-affinity-1s-0224
Now: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1709028167000&to=1709029750000&var-namespace=tpch-8c32g-1cn-affinity-test
- The eviction at the beginning of both tests is too aggressive.
- The eviction starts even when there is still quite much memory. Previously, eviction started at 8.5G when there is 13GB in total. Now, eviction started at 10.7 when there is 15GB. 35%/29% of total memory is not used.
https://github.com/risingwavelabs/risingwave/issues/14797#issuecomment-1963266578
Does this confirm that subquery unnesting is the major cause?
Does this confirm that subquery unnesting is the major cause?
I think it is because:
by comparing the CPU usage and throughput of q20-only-greater
during the same period
RW: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708882589000&to=1708884401000&var-namespace=tpch-1cn-affinity-1s-0224
Flink: https://grafana.test.risingwave-cloud.xyz/d/Q4ayNkfVz/flink-hits?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708882398000&to=1708883721000&var-namespace=tpch-flink-1tm-ckpt-10s
After the peak at the beginning, RW uses much less CPU and the throughput is low.
while RW and Flink performs very similar when it comes to q20-no-greater
.
RW: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708861851000&to=1708862547000&var-namespace=tpch-1cn-affinity-10s-0224
Flink: https://grafana.test.risingwave-cloud.xyz/d/Q4ayNkfVz/flink-hits?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708706260000&to=1708706922000&var-namespace=tpch-flink-1tm-ckpt-10s
Does this confirm that subquery unnesting is the major cause?
q20-only-greater metabase: http://metabase.risingwave-cloud.xyz/question/13316-tpch-q20-only-greater-bs-medium-1cn-af[…]-second-rows-s-history-thtb-3123?start_date=2024-01-26
when the cache starts to evict aggressively, block cache data miss ops increases s3-read increases join barrier alignment latency increase,
I think it has a very strong correlation.
This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.