risingwave
risingwave copied to clipboard
Compute node OOM while loading ch-benchmark data with 18 modified MVs
Describe the bug
Run ch-benchmark modified queries except q3,q7,q8,q21 with 2800 warehouses. All the sources/MVs/sinks created successfully. The compute node OOM during loading data.
Namespace: ch-benchmark-pg-cdc-20231025-060139
https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/46#018b62a2-cbcb-4636-a4af-8a2f712ca336
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&from=1698213717228&to=1698218983113&var-datasource=P2453400D1763B4D9&var-namespace=ch-benchmark-pg-cdc-20231025-060139&var-instance=benchmark-risingwave&var-node=All&var-job=All&var-table=All
1698214041-2023-10-25-06-07-20.auto.heap.gz
Above is the first heap dump. Others are here https://s3.console.aws.amazon.com/s3/buckets/test-useast1-mgmt-bucket-archiver?region=us-east-1&prefix=k8s/ch-benchmark-pg-cdc-20231025-060139/benchmark-risingwave-compute-c-0/ccb93d72-e9cf-440f-9f21-485f1e21b8ae/2023-10-25/0/&showversions=false
Error message/log
No response
To Reproduce
No response
Expected behavior
No response
How did you deploy RisingWave?
No response
The version of RisingWave
nightly-20231024
Additional context
No response
@wcy-fdu is working on Memtable spill. Hopefully it can solve this issue. If not, let's see later.
Hit another OOM with v1.5.0-rc
https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/101#018c335f-3db9-42c1-b483-13f8a37d369e
ENABLE_MEMORY_PROFILING
was true in last run. Let's analyze the heap dump to see what happened...
1204-ch-benchmark-oom-cn-0.heap.collapsed.zip
Cause by hash join executor, the join executor insert state_table and try_flush together take up a lot of memory(1.7G)
Could we reduce parallel number for every mv when we create too many mv in one cluster? I think meta-service shall scale-in automatically.
Ran again with memory-size-based back-pressure (#13775). Still OOMed... 🥹
https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/103
Ran again with memory-size-based back-pressure (#13775). Still OOMed... 🥹
This oom is not cause by too much actor channel buffer, but the hash join executor's mem table is still a bit large, as the workload will bring so many join. If we are in a hurry to solve this case, change the flush threshold to 1mb or smaller may work, if not so urgent, just keep current practices because blindly adjusting the threshold value can indeed reduce OOM, but it will also bring some overhead. I‘m going to introduce some global memory monitoring and dynamic threshold adjustment and may be suitable for such case.
change the flush threshold to 1mb or smaller may work
I don't think so. Searching for state_table
, the results show that the usage is less than 2GB, which is a reasonable number I think.
Besides, this part looks like memory usage of HashAgg. Still thinking about the reason.
change the flush threshold to 1mb or smaller may work
I don't think so. Searching for
state_table
, the results show that the usage is less than 2GB, which is a reasonable number I think.Besides, this part looks like memory usage of HashAgg. Still thinking about the reason.
![]()
I see, so you suspect that what’s inside the red box above is some buffer? It is indeed difficult to analyze from the heap file.
Besides, this part looks like memory usage of HashAgg. Still thinking about the reason.
- parallelism amplification, Currently the memory based Agg emit is executor level. If a hashAgg Executor's dirty state is more than 64MB, it will emit these states. So the upper bound of the memory here is 64MB*32(parallelism) = 2GB.
-
And 2-4 is anaylazing the flamegraph
-
there is some memory (515MB of 1956MB) can not be tracked by us currently and it is the memory used by HashMap self.
-
We estimate the Key and value size in the hash map, but they are less than the memory in mem perf
- value(
AggGroup::create
) consuming 352MB of 1956MB) - key(
HashKey::deserialize
) consuming 544MB of 1956MB) The expected estimated size should be 900MB but in metrics, RW only estimate 387MB. It could because the OOM comes too fast to report the metrics.
- value(
-
there still 544MB memory is allocating by hash agg but not in the dirty group
-
c.c. @stdrc
Running a test with stream_hash_agg_max_dirty_groups_heap_size
= 2MB
https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/104
I think it doesn't make sense that stream_hash_agg_max_dirty_groups_heap_size
is much bigger than mem_table_spill_threshold
. They should be same, right?
Running a test with
stream_hash_agg_max_dirty_groups_heap_size
= 2MBhttps://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/104
I think it doesn't make sense that
stream_hash_agg_max_dirty_groups_heap_size
is much bigger thanmem_table_spill_threshold
. They should be same, right?
I guess Some AggGroup could be larger than the result row.
I guess Some AggGroup could be larger than the result row.
Oh, do you mean min/max? Indeed.
Let's wait for the test first. First we need to prove it's the cause of OOM. If so, let's then pick a better value for stream_hash_agg_max_dirty_groups_heap_size
.
Running a test with
stream_hash_agg_max_dirty_groups_heap_size
= 2MBhttps://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/104
I think it doesn't make sense that
stream_hash_agg_max_dirty_groups_heap_size
is much bigger thanmem_table_spill_threshold
. They should be same, right?
The run didn't succeed, still OOMed. But the parameters did take some effect. Grafana
The memory usage of HashAgg
(as pointed out with red box previously) disappeared.
1701861649-2023-12-06-11-20-48.auto.heap.collapsed.zip
Now, nearly all memory in streaming part is used by state_table
.
I will continue looking into this. (Perhaps caused by some bad query plan? 🤔...)
@st1page I'd like to decrease the default stream_hash_agg_max_dirty_groups_heap_size
to around 2MB. What do you think?
Interesting observations on stream_source_output_rows_counts
(total count, not rate()
)
stream_source_output_rows_counts{namespace=~"$namespace",risingwave_name=~"$instance",risingwave_component=~"$component",pod=~"$pod"}
stream_source_output_rows_counts
of stock
table
stream_source_output_rows_counts
of the rest tables
The rest tables didn't consume any changes until 19:20. And immediately after it started, the compute node OOMed.
Running with 9 queries q13,q14,q15,q16,q17,q18,q19,q20,q22
still got OOM
https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/106#018c4211-6622-4016-8188-dee2aabb8ca6
There is a 1400x amplification in Q20.
CREATE MATERIALIZED VIEW ch_benchmark_q20 AS
SELECT
s_name,
s_address
FROM
supplier,
nation
WHERE
s_suppkey IN (
SELECT
mod(s_i_id * s_w_id, 10000)
FROM
stock,
order_line
WHERE
s_i_id IN (
SELECT
i_id
FROM
item
WHERE
i_data NOT LIKE 'co%'
)
AND ol_i_id = s_i_id
AND ol_delivery_d > '2010-05-23 12:00:00'
GROUP BY
s_i_id,
s_w_id,
s_quantity
HAVING
200 * s_quantity > sum(ol_quantity)
)
AND s_nationkey = n_nationkey
Plan:
StreamMaterialize { columns: [s_name, s_address, supplier.s_suppkey(hidden), supplier.s_nationkey(hidden)], stream_key: [supplier.s_suppkey, supplier.s_nationkey], pk_columns: [supplier.s_suppkey, supplier.s_nationkey], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(supplier.s_suppkey, supplier.s_nationkey) }
└─StreamHashJoin { type: LeftSemi, predicate: supplier.s_suppkey = $expr1 }
├─StreamExchange { dist: HashShard(supplier.s_suppkey) }
│ └─StreamHashJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey }
│ ├─StreamExchange { dist: HashShard(supplier.s_nationkey) }
│ │ └─StreamTableScan { table: supplier, columns: [s_suppkey, s_name, s_address, s_nationkey] }
│ └─StreamExchange { dist: HashShard(nation.n_nationkey) }
│ └─StreamTableScan { table: nation, columns: [n_nationkey] }
└─StreamExchange { dist: HashShard($expr1) }
└─StreamProject { exprs: [((stock.s_i_id * stock.s_w_id) % 10000:Int32)::Int64 as $expr1, stock.s_i_id, stock.s_w_id, stock.s_quantity] }
└─StreamFilter { predicate: ((200:Int32 * stock.s_quantity) > sum(order_line.ol_quantity)) }
└─StreamProject { exprs: [stock.s_i_id, stock.s_w_id, stock.s_quantity, sum(order_line.ol_quantity)] }
└─StreamHashAgg { group_key: [stock.s_i_id, stock.s_w_id, stock.s_quantity], aggs: [sum(order_line.ol_quantity), count] }
└─StreamHashJoin { type: LeftSemi, predicate: stock.s_i_id = item.i_id }
├─StreamHashJoin { type: Inner, predicate: stock.s_i_id = order_line.ol_i_id } ❗️❗️❗️
│ ├─StreamExchange { dist: HashShard(stock.s_i_id) }
│ │ └─StreamTableScan { table: stock, columns: [s_i_id, s_w_id, s_quantity] }
│ └─StreamExchange { dist: HashShard(order_line.ol_i_id) }
│ └─StreamProject { exprs: [order_line.ol_i_id, order_line.ol_quantity, order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number] }
│ └─StreamFilter { predicate: (order_line.ol_delivery_d > '2010-05-23 12:00:00':Timestamp) }
│ └─StreamTableScan { table: order_line, columns: [ol_i_id, ol_quantity, ol_w_id, ol_d_id, ol_o_id, ol_number, ol_delivery_d] }
└─StreamExchange { dist: HashShard(item.i_id) }
└─StreamProject { exprs: [item.i_id] }
└─StreamFilter { predicate: Not(Like(item.i_data, 'co%':Varchar)) }
└─StreamTableScan { table: item, columns: [i_id, i_data] }
Notice the StreamHashJoin
with ❗️❗️❗️
, the join condition is stock.s_i_id = order_line.ol_i_id
However, the stock table is very large (around 140,000,000 rows), but with only 100,000 distinct value of s_i_id
, so each new line in order_line
will be amplified 1400x
so each new line in order_line will be amplified 1400x
Is big amplification supposed to be solved by spill anytime https://github.com/risingwavelabs/risingwave/pull/12028?
so each new line in order_line will be amplified 1400x
Is big amplification supposed to be solved by spill anytime #12028?
Partially. The big amplification may cause multiple problems. Even regarding of OOM, there are more than one problem (data in mem-table, data in the shuffle channel, data in streaming operator, etc.). But AFAIK the major one is caused by memtable, thus, can be solved by spill anytime.
let me run the same setting again with nightly-20240102
:
https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/150#018cd321-b085-4136-ac9c-4327b8feb534
It OOMed: https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/150#018cd321-b085-4136-ac9c-4327b8feb534
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&from=1704349538555&to=1704350593203&var-datasource=P2453400D1763B4D9&var-namespace=ch-benchmark-pg-cdc-20240104-062245&var-instance=benchmark-risingwave&var-pod=All&var-component=All&var-table=All
However, I didn't find the heap dumps on s3 while the other two executions of the same pipeline today do exist.
All 3CNs OOMed at the same time. This is all-in-one-machine deployement.
@huangjw806 is helping to check if the entire machine runs out of memory and thus kills the CNs.
However, I didn't find the heap dumps on s3 while the other two executions of the same pipeline today do exist.
You probably need:
ENABLE_MEMORY_PROFILING="true"
This is sometimes annoying...
let me make it true as default because we no longer do sampling.
huangjw806 is helping to check if the entire machine runs out of memory and thus kills the CNs. And the entire machine's memory is enough for the workload: https://grafana.test.risingwave-cloud.xyz/d/LDL_mx9nz123/kubernetes-node-exporter?orgId=1&var-DS_PROMETHEUS=P2453400D1763B4D9&var-node=ip-10-0-41-244.ec2.internal&var-internal_ip=10.0.41.244&var-diskdevices=%5Ba-z%5D+%7Cnvme%5B0-9%5D+n%5B0-9%5D+%7Cmmcblk%5B0-9%5D+&from=1704346828944&to=1704352403138
The memory usage of the 3 CNs look similar to the one in https://github.com/risingwavelabs/risingwave/issues/14251
https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/152#018cd34b-eaf3-4148-a74c-4da996384253:
I just analyzed Run 149 (namespace: ch-benchmark-pg-cdc-20240104-061254
) and the conclusion is same as above. The prost message decoding used more than 6GB memory.
1704349111-2024-01-04-06-18-30.auto.heap.collapsed.zip
the message of the BarrierMutation
consume 6GB
message AddMutation {
// New dispatchers for each actor.
map<uint32, Dispatchers> actor_dispatchers = 1;
// All actors to be added (to the main connected component of the graph) in this update.
repeated uint32 added_actors = 3;
// We may embed a source change split mutation here.
// TODO: we may allow multiple mutations in a single barrier.
map<uint32, source.ConnectorSplits> actor_splits = 2;
// We may embed a pause mutation here.
// TODO: we may allow multiple mutations in a single barrier.
bool pause = 4;
}
message Dispatchers {
repeated Dispatcher dispatchers = 1;
}
message Dispatcher {
DispatcherType type = 1;
// Indices of the columns to be used for hashing.
// For dispatcher types other than HASH, this is ignored.
repeated uint32 dist_key_indices = 2;
// Indices of the columns to output.
// In most cases, this contains all columns in the input. But for some cases like MV on MV or
// schema change, we may only output a subset of the columns.
repeated uint32 output_indices = 6;
// The hash mapping for consistent hash.
// For dispatcher types other than HASH, this is ignored.
ActorMapping hash_mapping = 3;
// Dispatcher can be uniquely identified by a combination of actor id and dispatcher id.
// This is exactly the same as its downstream fragment id.
uint64 dispatcher_id = 4;
// Number of downstreams decides how many endpoints a dispatcher should dispatch.
repeated uint32 downstream_actor_id = 5;
}
std::mem::size_of::<risingwave_pb::stream_plan::Dispatcher>() = 136
parallelism is 96 and there are only 3CN?
there will be 96/3 = 32 AddMutation messages in each CN.
each AddMutation message will have totall actors number elements in the hashMap actor_dispatchers
which is 96(paralellism)*10(fragment num)
each Dispatchers will have 96 elements(Dispatcher).
136Byte * 96 * 96 * 10 * 32 = 382.5MB
there is still about 10x amplification in somewhere.
Is there any case that there can be more than one DDL running concurrently in the pipeline?
there is still about 10x amplification in somewhere. Is there any case that there can be more than one DDL running concurrently in the pipeline?
This is a direct CDC testing pipeline, does it matter?
also cc: @cyliu0 for the DDL question
parallelism is 96 and there are only 3CN? there will be 96/3 = 32 AddMutation messages in each CN. each AddMutation message will have totall actors number elements in the hashMap
actor_dispatchers
which is 96(paralellism)*10(fragment num) each Dispatchers will have 96 elements(Dispatcher). 136Byte * 96 * 96 * 10 * 32 = 382.5MB there is still about 10x amplification in somewhere. Is there any case that there can be more than one DDL running concurrently in the pipeline?
Oh there are 12 tables in the ch benchmark so
there will be 96/3 = 32 AddMutation messages in each CN.
should be 12(sources num) * 96 (total parallelism) / 3 (CN num) And then the memory consumption is 382.5MB * 12 = 4590MB which makes sense with the memory heap number