risingwave
risingwave copied to clipboard
perf: improve nexmark q15 baseline and scaling up performance
nightly-20240224
RW Config:
RW_CONFIG="{'system':{'data_directory':'hummock_001','barrier_interval_ms':10000},'server':{'telemetry_enabled':false},'meta': {'level0_tier_compact_file_number':6,'level0_overlapping_sub_level_compact_level_count':6}}"
RW:
4X: http://metabase.risingwave-cloud.xyz/question/9236-nexmark-q15-blackhole-4x-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-2763?start_date=2024-01-04
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708900035000&to=1708901778000&var-namespace=nexmark-ht-4x-1cn-affinity-10s https://buildkite.com/risingwave-test/nexmark-benchmark/builds/3156
1X: http://metabase.risingwave-cloud.xyz/question/603-nexmark-q15-blackhole-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-178?start_date=2023-11-17
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708899363000&to=1708901166000&var-namespace=nexmark-1cn-affinity-10s https://buildkite.com/risingwave-test/nexmark-benchmark/builds/3155
RW 4X: 1.15M RW 1X: 546K
4X/1X Ratio: 2.1
Flink:
4X: http://metabase.risingwave-cloud.xyz/question/9712-flink-nexmark-q15-flink-4x-medium-1tm-avg-job-throughput-per-second-records-s-history-thtb-2920?start_date=2023-12-05
https://grafana.test.risingwave-cloud.xyz/d/Q4ayNkfVz/flink-hits?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1704381755000&to=1704382897000&var-namespace=flink-4x-medium-1tm-test-20240104
https://buildkite.com/risingwave-test/flink-nexmark-bench/builds/130
1X: http://metabase.risingwave-cloud.xyz/question/2336-flink-nexmark-q15-flink-medium-1tm-avg-job-throughput-per-second-records-s-history-thtb-146?start_date=2023-07-08
https://grafana.test.risingwave-cloud.xyz/d/Q4ayNkfVz/flink-hits?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1704454873000&to=1704456616000&var-namespace=flink-medium-1tm-test-20230104
https://buildkite.com/risingwave-test/flink-nexmark-bench/builds/131
4X/1X Ratio: 3.06
Under 1X setting, RW and Flink does not diff much.
Setting https://github.com/risingwavelabs/kube-bench/blob/main/env.toml#L113-L114:
Q15_RW_FORCE_TWO_PHASE_AGG = true
Q15_RW_FORCE_SPLIT_DISTINCT_AGG = true
RW:
CREATE SINK nexmark_q15 AS
SELECT to_char(date_time, 'YYYY-MM-DD') as "day",
count(*) AS total_bids,
count(*) filter (where price < 10000) AS rank1_bids,
count(*) filter (where price >= 10000 and price < 1000000) AS rank2_bids,
count(*) filter (where price >= 1000000) AS rank3_bids,
count(distinct bidder) AS total_bidders,
count(distinct bidder) filter (where price < 10000) AS rank1_bidders,
count(distinct bidder) filter (where price >= 10000 and price < 1000000) AS rank2_bidders,
count(distinct bidder) filter (where price >= 1000000) AS rank3_bidders,
count(distinct auction) AS total_auctions,
count(distinct auction) filter (where price < 10000) AS rank1_auctions,
count(distinct auction) filter (where price >= 10000 and price < 1000000) AS rank2_auctions,
count(distinct auction) filter (where price >= 1000000) AS rank3_auctions
FROM bid
GROUP BY to_char(date_time, 'YYYY-MM-DD')
WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
Plan:
StreamSink { type: append-only, columns: [day, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions] }
└─StreamProject { exprs: [$expr2_expanded, sum0(sum0(count) filter((flag = 0:Int64))), sum0(sum0(count filter(($expr3 < 10000:Int32))) filter((flag = 0:Int64))), sum0(sum0(count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))) filter((flag = 0:Int64))), sum0(sum0(count filter(($expr3 >= 1000000:Int32))) filter((flag = 0:Int64))), sum0(count($expr4_expanded) filter((flag = 1:Int64))), sum0(count($expr4_expanded) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count($expr4_expanded) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count($expr4_expanded) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count($expr5_expanded) filter((flag = 2:Int64))), sum0(count($expr5_expanded) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64))), sum0(count($expr5_expanded) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))), sum0(count($expr5_expanded) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)))] }
└─StreamHashAgg { group_key: [$expr2_expanded], aggs: [sum0(sum0(count) filter((flag = 0:Int64))), sum0(sum0(count filter(($expr3 < 10000:Int32))) filter((flag = 0:Int64))), sum0(sum0(count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))) filter((flag = 0:Int64))), sum0(sum0(count filter(($expr3 >= 1000000:Int32))) filter((flag = 0:Int64))), sum0(count($expr4_expanded) filter((flag = 1:Int64))), sum0(count($expr4_expanded) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count($expr4_expanded) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count($expr4_expanded) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count($expr5_expanded) filter((flag = 2:Int64))), sum0(count($expr5_expanded) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64))), sum0(count($expr5_expanded) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))), sum0(count($expr5_expanded) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))), count] }
└─StreamExchange { dist: HashShard($expr2_expanded) }
└─StreamHashAgg { group_key: [$expr2_expanded, $expr6], aggs: [sum0(count) filter((flag = 0:Int64)), sum0(count filter(($expr3 < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr3 >= 1000000:Int32))) filter((flag = 0:Int64)), count($expr4_expanded) filter((flag = 1:Int64)), count($expr4_expanded) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr4_expanded) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr4_expanded) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr5_expanded) filter((flag = 2:Int64)), count($expr5_expanded) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr5_expanded) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr5_expanded) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count] }
└─StreamProject { exprs: [$expr2_expanded, $expr4_expanded, $expr5_expanded, flag, count, count filter(($expr3 < 10000:Int32)), count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count filter(($expr3 >= 1000000:Int32)), Vnode($expr2_expanded, $expr4_expanded, $expr5_expanded, flag) as $expr6] }
└─StreamHashAgg [append_only] { group_key: [$expr2_expanded, $expr4_expanded, $expr5_expanded, flag], aggs: [count, count filter(($expr3 < 10000:Int32)), count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count filter(($expr3 >= 1000000:Int32))] }
└─StreamExchange { dist: HashShard($expr2_expanded, $expr4_expanded, $expr5_expanded, flag) }
└─StreamExpand { column_subsets: [[$expr2], [$expr2, $expr4], [$expr2, $expr5]] }
└─StreamProject { exprs: [ToChar($expr1, 'YYYY-MM-DD':Varchar) as $expr2, Field(bid, 2:Int32) as $expr3, Field(bid, 1:Int32) as $expr4, Field(bid, 0:Int32) as $expr5, _row_id] }
└─StreamFilter { predicate: (event_type = 2:Int32) }
└─StreamRowIdGen { row_id_index: 6 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
└─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
└─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
(15 rows)
Let's compare the metrics between RW 1X and RW 4X.
Memory
1X:
4X:
7.43*4 = 29.72 < 33. 4X is using even more memory.
Agg's Cache Miss Rate
1X:
4X:
1X's Agg's cache miss rate is even higher than 4X's.
Agg's Cached Keys
1X:
4X:
Hummock Read
1X:
4X:
4X's block cache miss rate is even lower than 1X, although both cache miss rate has periodic flucutaion.
Since q15
has distinct aggregation
, which is a complex operator that may become the bottleneck.
We remove the distinct keyword in all the aggregations and introduce q15-no-distinct
:
https://github.com/risingwavelabs/kube-bench/blob/main/manifests/nexmark/nexmark-sinks.template.yaml#L597-L617
CREATE SINK nexmark_q15_no_distinct AS
SELECT to_char(date_time, 'YYYY-MM-DD') as "day",
count(*) AS total_bids,
count(*) filter (where price < 10000) AS rank1_bids,
count(*) filter (where price >= 10000 and price < 1000000) AS rank2_bids,
count(*) filter (where price >= 1000000) AS rank3_bids,
count(bidder) AS total_bidders,
count(bidder) filter (where price < 10000) AS rank1_bidders,
count(bidder) filter (where price >= 10000 and price < 1000000) AS rank2_bidders,
count(bidder) filter (where price >= 1000000) AS rank3_bidders,
count(auction) AS total_auctions,
count(auction) filter (where price < 10000) AS rank1_auctions,
count(auction) filter (where price >= 10000 and price < 1000000) AS rank2_auctions,
count(auction) filter (where price >= 1000000) AS rank3_auctions
FROM bid
GROUP BY to_char(date_time, 'YYYY-MM-DD')
WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
Since the group by columns is a single time column, by default, we use two phase aggregation: https://github.com/risingwavelabs/kube-bench/blob/main/env.toml#L115
Therefore, the plan:
StreamSink { type: append-only, columns: [day, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions] }
└─StreamProject { exprs: [$expr2, sum0(count), sum0(count filter(($expr3 < 10000:Int32))), sum0(count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count filter(($expr3 >= 1000000:Int32))), sum0(count($expr4)), sum0(count($expr4) filter(($expr3 < 10000:Int32))), sum0(count($expr4) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count($expr4) filter(($expr3 >= 1000000:Int32))), sum0(count($expr5)), sum0(count($expr5) filter(($expr3 < 10000:Int32))), sum0(count($expr5) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count($expr5) filter(($expr3 >= 1000000:Int32)))] }
└─StreamHashAgg { group_key: [$expr2], aggs: [sum0(count), sum0(count filter(($expr3 < 10000:Int32))), sum0(count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count filter(($expr3 >= 1000000:Int32))), sum0(count($expr4)), sum0(count($expr4) filter(($expr3 < 10000:Int32))), sum0(count($expr4) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count($expr4) filter(($expr3 >= 1000000:Int32))), sum0(count($expr5)), sum0(count($expr5) filter(($expr3 < 10000:Int32))), sum0(count($expr5) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count($expr5) filter(($expr3 >= 1000000:Int32))), count] }
└─StreamExchange { dist: HashShard($expr2) }
└─StreamHashAgg [append_only] { group_key: [$expr2, $expr6], aggs: [count, count filter(($expr3 < 10000:Int32)), count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count filter(($expr3 >= 1000000:Int32)), count($expr4), count($expr4) filter(($expr3 < 10000:Int32)), count($expr4) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count($expr4) filter(($expr3 >= 1000000:Int32)), count($expr5), count($expr5) filter(($expr3 < 10000:Int32)), count($expr5) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count($expr5) filter(($expr3 >= 1000000:Int32))] }
└─StreamProject { exprs: [ToChar($expr1, 'YYYY-MM-DD':Varchar) as $expr2, Field(bid, 2:Int32) as $expr3, Field(bid, 1:Int32) as $expr4, Field(bid, 0:Int32) as $expr5, _row_id, Vnode(_row_id) as $expr6] }
└─StreamFilter { predicate: (event_type = 2:Int32) }
└─StreamRowIdGen { row_id_index: 6 }
└─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
└─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
└─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
(11 rows)
RW 1X: http://metabase.risingwave-cloud.xyz/question/12510-nexmark-q15-no-distinct-blackhole-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-3087?start_date=2024-01-23
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708901385000&to=1708902468000&var-namespace=nexmark-1cn-affinity-10s
https://buildkite.com/risingwave-test/nexmark-benchmark/builds/3155
RW 4X: http://metabase.risingwave-cloud.xyz/question/12653-nexmark-q15-no-distinct-blackhole-4x-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-3089?start_date=2024-01-24
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708902057000&to=1708902659000&var-namespace=nexmark-ht-4x-1cn-affinity-10s
https://buildkite.com/risingwave-test/nexmark-benchmark/builds/3156
RW 4X: 3.3M RW 1X: 923K
Both almost close to the throughput of a stateless query.
4X/1X Ratio: 3.57
Both the scalability and the absolute throughput of RW are way much better when there is no distinct
.
Flink 1X: http://metabase.risingwave-cloud.xyz/question/13468-flink-nexmark-q15-no-distinct-flink-medium-1tm-avg-job-throughput-per-second-records-s-history-thtb-3060?start_date=2024-01-27
https://grafana.test.risingwave-cloud.xyz/d/Q4ayNkfVz/flink-hits?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1709111642000&to=1709112604000&var-namespace=flink-medium-1tm-ckpt-10s
https://buildkite.com/risingwave-test/flink-nexmark-bench/builds/148
Flink 4X: http://metabase.risingwave-cloud.xyz/question/13478-flink-nexmark-q15-no-distinct-flink-4x-medium-1tm-avg-job-throughput-per-second-records-s-history-thtb-3061?start_date=2024-01-27
https://grafana.test.risingwave-cloud.xyz/d/Q4ayNkfVz/flink-hits?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708961698000&to=1708962300000&var-namespace=flink-4x-medium-1tm-20240226
https://buildkite.com/risingwave-test/flink-nexmark-bench/builds/144
Flink 4X: 3.3M Flink 1X: 1M
we can conclude that q15-no-distinct
is a scalable query for both RW and Flink.
This is the Flink plan after enabling table.optimizer.distinct-agg.split.enabled: true
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.nexmark_q15], fields=[day, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12])
+- GroupAggregate(groupBy=[day], partialFinalType=[FINAL], select=[day, $SUM0_RETRACT($f3) AS $f1, $SUM0_RETRACT($f4) AS $f2, $SUM0_RETRACT($f5) AS $f3, $SUM0_RETRACT($f6_0) AS $f4, $SUM0_RETRACT($f7_0) AS $f5, $SUM0_RETRACT($f8) AS $f6, $SUM0_RETRACT($f9) AS $f7, $SUM0_RETRACT($f10) AS $f8, $SUM0_RETRACT($f11) AS $f9, $SUM0_RETRACT($f12) AS $f10, $SUM0_RETRACT($f13) AS $f11, $SUM0_RETRACT($f14) AS $f12])
+- Exchange(distribution=[hash[day]])
+- GroupAggregate(groupBy=[day, $f6, $f7], partialFinalType=[PARTIAL], select=[day, $f6, $f7, COUNT(*) FILTER $g_3 AS $f3, COUNT(*) FILTER $g_30 AS $f4, COUNT(*) FILTER $g_31 AS $f5, COUNT(*) FILTER $g_32 AS $f6_0, COUNT(DISTINCT bidder) FILTER $g_1 AS $f7_0, COUNT(DISTINCT bidder) FILTER $g_10 AS $f8, COUNT(DISTINCT bidder) FILTER $g_11 AS $f9, COUNT(DISTINCT bidder) FILTER $g_12 AS $f10, COUNT(DISTINCT auction) FILTER $g_2 AS $f11, COUNT(DISTINCT auction) FILTER $g_20 AS $f12, COUNT(DISTINCT auction) FILTER $g_21 AS $f13, COUNT(DISTINCT auction) FILTER $g_22 AS $f14])
+- Exchange(distribution=[hash[day, $f6, $f7]])
+- Calc(select=[day, $f1, $f2, $f3, bidder, auction, $f6, $f7, =($e, 3) AS $g_3, AND(=($e, 3), $f1) AS $g_30, AND(=($e, 3), $f2) AS $g_31, AND(=($e, 3), $f3) AS $g_32, =($e, 1) AS $g_1, AND(=($e, 1), $f1) AS $g_10, AND(=($e, 1), $f2) AS $g_11, AND(=($e, 1), $f3) AS $g_12, =($e, 2) AS $g_2, AND(=($e, 2), $f1) AS $g_20, AND(=($e, 2), $f2) AS $g_21, AND(=($e, 2), $f3) AS $g_22])
+- Expand(projects=[{day, $f1, $f2, $f3, bidder, auction, $f6, null AS $f7, 1 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, $f7, 2 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, null AS $f7, 3 AS $e}])
+- Calc(select=[DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), _UTF-16LE'yyyy-MM-dd') AS day, IS TRUE(<(bid.price, 10000)) AS $f1, IS TRUE(SEARCH(bid.price, Sarg[[10000..1000000)])) AS $f2, IS TRUE(>=(bid.price, 1000000)) AS $f3, bid.bidder AS bidder, bid.auction AS auction, MOD(HASH_CODE(bid.bidder), 1024) AS $f6, MOD(HASH_CODE(bid.auction), 1024) AS $f7], where=[=(event_type, 2)])
+- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
+- Calc(select=[event_type, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
+- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.nexmark_q15], fields=[day, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12])
+- GroupAggregate(groupBy=[day], partialFinalType=[FINAL], select=[day, $SUM0_RETRACT($f3) AS $f1, $SUM0_RETRACT($f4) AS $f2, $SUM0_RETRACT($f5) AS $f3, $SUM0_RETRACT($f6_0) AS $f4, $SUM0_RETRACT($f7_0) AS $f5, $SUM0_RETRACT($f8) AS $f6, $SUM0_RETRACT($f9) AS $f7, $SUM0_RETRACT($f10) AS $f8, $SUM0_RETRACT($f11) AS $f9, $SUM0_RETRACT($f12) AS $f10, $SUM0_RETRACT($f13) AS $f11, $SUM0_RETRACT($f14) AS $f12])
+- Exchange(distribution=[hash[day]])
+- GroupAggregate(groupBy=[day, $f6, $f7], partialFinalType=[PARTIAL], select=[day, $f6, $f7, COUNT(*) FILTER $g_3 AS $f3, COUNT(*) FILTER $g_30 AS $f4, COUNT(*) FILTER $g_31 AS $f5, COUNT(*) FILTER $g_32 AS $f6_0, COUNT(DISTINCT bidder) FILTER $g_1 AS $f7_0, COUNT(DISTINCT bidder) FILTER $g_10 AS $f8, COUNT(DISTINCT bidder) FILTER $g_11 AS $f9, COUNT(DISTINCT bidder) FILTER $g_12 AS $f10, COUNT(DISTINCT auction) FILTER $g_2 AS $f11, COUNT(DISTINCT auction) FILTER $g_20 AS $f12, COUNT(DISTINCT auction) FILTER $g_21 AS $f13, COUNT(DISTINCT auction) FILTER $g_22 AS $f14])
+- Exchange(distribution=[hash[day, $f6, $f7]])
+- Calc(select=[day, $f1, $f2, $f3, bidder, auction, $f6, $f7, ($e = 3) AS $g_3, (($e = 3) AND $f1) AS $g_30, (($e = 3) AND $f2) AS $g_31, (($e = 3) AND $f3) AS $g_32, ($e = 1) AS $g_1, (($e = 1) AND $f1) AS $g_10, (($e = 1) AND $f2) AS $g_11, (($e = 1) AND $f3) AS $g_12, ($e = 2) AS $g_2, (($e = 2) AND $f1) AS $g_20, (($e = 2) AND $f2) AS $g_21, (($e = 2) AND $f3) AS $g_22])
+- Expand(projects=[{day, $f1, $f2, $f3, bidder, auction, $f6, null AS $f7, 1 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, $f7, 2 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, null AS $f7, 3 AS $e}])
+- Calc(select=[DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), 'yyyy-MM-dd') AS day, (bid.price < 10000) IS TRUE AS $f1, SEARCH(bid.price, Sarg[[10000..1000000)]) IS TRUE AS $f2, (bid.price >= 1000000) IS TRUE AS $f3, bid.bidder AS bidder, bid.auction AS auction, MOD(HASH_CODE(bid.bidder), 1024) AS $f6, MOD(HASH_CODE(bid.auction), 1024) AS $f7], where=[(event_type = 2)])
+- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])
+- Calc(select=[event_type, bid, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
+- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
link #11964
With @st1page and @Little-Wallace ,
we suspected that the aggregation dirty heap size, 64MB by default, is limiting the throughput.
The reason is that due to the group by columns, aka day
, bidder
, and auction
, and the data pattern, the skewness is strong, all the data go to a single partial aggregation operator at any given time
We found that the aggregation dirty heap size fluctuates around 64MB:
The figure above comes from: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1709731899000&to=1709733586000&var-namespace=nexmark-ht-4x-1cn-affinity-10s
Flink has a similar knob to tune. https://github.com/risingwavelabs/kube-bench/blob/main/env.toml#L658-L663 https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/tuning/
However, after we set stream_hash_agg_max_dirty_groups_heap_size
to 268435456, aka 256MB, it does not change much
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1709736387000&orgId=1&to=1709738065000&var-datasource=Prometheus:+test-useast1-eks-a&var-namespace=nexmark-ht-4x-1cn-affinity-10s
The throughput does not change.
The two executions mentioned in this thread can be found at metabase: http://metabase.risingwave-cloud.xyz/question/9236-nexmark-q15-blackhole-4x-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-2763?start_date=2024-01-04
#15696 has improved by about 20%
This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.