risingwave icon indicating copy to clipboard operation
risingwave copied to clipboard

Compute node OOM while loading ch-benchmark data with 18 modified MVs

Open cyliu0 opened this issue 1 year ago • 39 comments

Describe the bug

Run ch-benchmark modified queries except q3,q7,q8,q21 with 2800 warehouses. All the sources/MVs/sinks created successfully. The compute node OOM during loading data.

Namespace: ch-benchmark-pg-cdc-20231025-060139

https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/46#018b62a2-cbcb-4636-a4af-8a2f712ca336

https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&from=1698213717228&to=1698218983113&var-datasource=P2453400D1763B4D9&var-namespace=ch-benchmark-pg-cdc-20231025-060139&var-instance=benchmark-risingwave&var-node=All&var-job=All&var-table=All

image

1698214041-2023-10-25-06-07-20.auto.heap.gz

Above is the first heap dump. Others are here https://s3.console.aws.amazon.com/s3/buckets/test-useast1-mgmt-bucket-archiver?region=us-east-1&prefix=k8s/ch-benchmark-pg-cdc-20231025-060139/benchmark-risingwave-compute-c-0/ccb93d72-e9cf-440f-9f21-485f1e21b8ae/2023-10-25/0/&showversions=false

Error message/log

No response

To Reproduce

No response

Expected behavior

No response

How did you deploy RisingWave?

No response

The version of RisingWave

nightly-20231024

Additional context

No response

cyliu0 avatar Oct 25 '23 14:10 cyliu0

@wcy-fdu is working on Memtable spill. Hopefully it can solve this issue. If not, let's see later.

fuyufjh avatar Nov 01 '23 04:11 fuyufjh

Hit another OOM with v1.5.0-rc https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/101#018c335f-3db9-42c1-b483-13f8a37d369e

cyliu0 avatar Dec 04 '23 07:12 cyliu0

ENABLE_MEMORY_PROFILING was true in last run. Let's analyze the heap dump to see what happened...

fuyufjh avatar Dec 04 '23 07:12 fuyufjh

1204-ch-benchmark-oom-cn-0.heap.collapsed.zip

Cause by hash join executor, the join executor insert state_table and try_flush together take up a lot of memory(1.7G)

wcy-fdu avatar Dec 04 '23 09:12 wcy-fdu

Could we reduce parallel number for every mv when we create too many mv in one cluster? I think meta-service shall scale-in automatically.

Little-Wallace avatar Dec 05 '23 06:12 Little-Wallace

Ran again with memory-size-based back-pressure (#13775). Still OOMed... 🥹

https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/103

1701841399-2023-12-06-05-43-18.auto.heap.collapsed.zip

fuyufjh avatar Dec 06 '23 07:12 fuyufjh

Ran again with memory-size-based back-pressure (#13775). Still OOMed... 🥹

1701841399-2023-12-06-05-43-18.auto.heap.collapsed.zip

This oom is not cause by too much actor channel buffer, but the hash join executor's mem table is still a bit large, as the workload will bring so many join. If we are in a hurry to solve this case, change the flush threshold to 1mb or smaller may work, if not so urgent, just keep current practices because blindly adjusting the threshold value can indeed reduce OOM, but it will also bring some overhead. I‘m going to introduce some global memory monitoring and dynamic threshold adjustment and may be suitable for such case.

wcy-fdu avatar Dec 06 '23 07:12 wcy-fdu

change the flush threshold to 1mb or smaller may work

I don't think so. Searching for state_table, the results show that the usage is less than 2GB, which is a reasonable number I think.

Besides, this part looks like memory usage of HashAgg. Still thinking about the reason.

Screenshot 2023-12-06 at 15 22 59

fuyufjh avatar Dec 06 '23 07:12 fuyufjh

change the flush threshold to 1mb or smaller may work

I don't think so. Searching for state_table, the results show that the usage is less than 2GB, which is a reasonable number I think.

Besides, this part looks like memory usage of HashAgg. Still thinking about the reason.

Screenshot 2023-12-06 at 15 22 59

I see, so you suspect that what’s inside the red box above is some buffer? It is indeed difficult to analyze from the heap file.

wcy-fdu avatar Dec 06 '23 07:12 wcy-fdu

Besides, this part looks like memory usage of HashAgg. Still thinking about the reason.

  1. parallelism amplification, Currently the memory based Agg emit is executor level. If a hashAgg Executor's dirty state is more than 64MB, it will emit these states. So the upper bound of the memory here is 64MB*32(parallelism) = 2GB.
  • And 2-4 is anaylazing the flamegraph image

    1. there is some memory (515MB of 1956MB) can not be tracked by us currently and it is the memory used by HashMap self.

    2. We estimate the Key and value size in the hash map, but they are less than the memory in mem perf

      • value(AggGroup::create) consuming 352MB of 1956MB)
      • key(HashKey::deserialize) consuming 544MB of 1956MB) The expected estimated size should be 900MB but in metrics, RW only estimate 387MB. It could because the OOM comes too fast to report the metrics. image
    3. there still 544MB memory is allocating by hash agg but not in the dirty group

c.c. @stdrc

st1page avatar Dec 06 '23 08:12 st1page

Running a test with stream_hash_agg_max_dirty_groups_heap_size = 2MB

https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/104

I think it doesn't make sense that stream_hash_agg_max_dirty_groups_heap_size is much bigger than mem_table_spill_threshold. They should be same, right?

fuyufjh avatar Dec 06 '23 09:12 fuyufjh

Running a test with stream_hash_agg_max_dirty_groups_heap_size = 2MB

https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/104

I think it doesn't make sense that stream_hash_agg_max_dirty_groups_heap_size is much bigger than mem_table_spill_threshold. They should be same, right?

I guess Some AggGroup could be larger than the result row.

st1page avatar Dec 06 '23 10:12 st1page

I guess Some AggGroup could be larger than the result row.

Oh, do you mean min/max? Indeed.

Let's wait for the test first. First we need to prove it's the cause of OOM. If so, let's then pick a better value for stream_hash_agg_max_dirty_groups_heap_size.

fuyufjh avatar Dec 06 '23 11:12 fuyufjh

Running a test with stream_hash_agg_max_dirty_groups_heap_size = 2MB

https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/104

I think it doesn't make sense that stream_hash_agg_max_dirty_groups_heap_size is much bigger than mem_table_spill_threshold. They should be same, right?

The run didn't succeed, still OOMed. But the parameters did take some effect. Grafana

The memory usage of HashAgg (as pointed out with red box previously) disappeared.

1701861649-2023-12-06-11-20-48.auto.heap.collapsed.zip

Now, nearly all memory in streaming part is used by state_table.

image

I will continue looking into this. (Perhaps caused by some bad query plan? 🤔...)

@st1page I'd like to decrease the default stream_hash_agg_max_dirty_groups_heap_size to around 2MB. What do you think?

fuyufjh avatar Dec 07 '23 03:12 fuyufjh

Interesting observations on stream_source_output_rows_counts (total count, not rate())

stream_source_output_rows_counts{namespace=~"$namespace",risingwave_name=~"$instance",risingwave_component=~"$component",pod=~"$pod"}

stream_source_output_rows_counts of stock table

image

stream_source_output_rows_counts of the rest tables

image

The rest tables didn't consume any changes until 19:20. And immediately after it started, the compute node OOMed.

fuyufjh avatar Dec 07 '23 05:12 fuyufjh

Running with 9 queries q13,q14,q15,q16,q17,q18,q19,q20,q22 still got OOM https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/106#018c4211-6622-4016-8188-dee2aabb8ca6

cyliu0 avatar Dec 07 '23 07:12 cyliu0

There is a 1400x amplification in Q20.

CREATE MATERIALIZED VIEW ch_benchmark_q20 AS
SELECT
    s_name,
    s_address
FROM
    supplier,
    nation
WHERE
    s_suppkey IN (
        SELECT
            mod(s_i_id * s_w_id, 10000)
        FROM
            stock,
            order_line
        WHERE
            s_i_id IN (
                SELECT
                    i_id
                FROM
                    item
                WHERE
                    i_data NOT LIKE 'co%'
            )
            AND ol_i_id = s_i_id
            AND ol_delivery_d > '2010-05-23 12:00:00'
        GROUP BY
            s_i_id,
            s_w_id,
            s_quantity
        HAVING
            200 * s_quantity > sum(ol_quantity)
    )
    AND s_nationkey = n_nationkey

Plan:

 StreamMaterialize { columns: [s_name, s_address, supplier.s_suppkey(hidden), supplier.s_nationkey(hidden)], stream_key: [supplier.s_suppkey, supplier.s_nationkey], pk_columns: [supplier.s_suppkey, supplier.s_nationkey], pk_conflict: NoCheck }
 └─StreamExchange { dist: HashShard(supplier.s_suppkey, supplier.s_nationkey) }
   └─StreamHashJoin { type: LeftSemi, predicate: supplier.s_suppkey = $expr1 }
     ├─StreamExchange { dist: HashShard(supplier.s_suppkey) }
     │ └─StreamHashJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey }
     │   ├─StreamExchange { dist: HashShard(supplier.s_nationkey) }
     │   │ └─StreamTableScan { table: supplier, columns: [s_suppkey, s_name, s_address, s_nationkey] }
     │   └─StreamExchange { dist: HashShard(nation.n_nationkey) }
     │     └─StreamTableScan { table: nation, columns: [n_nationkey] }
     └─StreamExchange { dist: HashShard($expr1) }
       └─StreamProject { exprs: [((stock.s_i_id * stock.s_w_id) % 10000:Int32)::Int64 as $expr1, stock.s_i_id, stock.s_w_id, stock.s_quantity] }
         └─StreamFilter { predicate: ((200:Int32 * stock.s_quantity) > sum(order_line.ol_quantity)) }
           └─StreamProject { exprs: [stock.s_i_id, stock.s_w_id, stock.s_quantity, sum(order_line.ol_quantity)] }
             └─StreamHashAgg { group_key: [stock.s_i_id, stock.s_w_id, stock.s_quantity], aggs: [sum(order_line.ol_quantity), count] }
               └─StreamHashJoin { type: LeftSemi, predicate: stock.s_i_id = item.i_id } 
                 ├─StreamHashJoin { type: Inner, predicate: stock.s_i_id = order_line.ol_i_id } ❗️❗️❗️ 
                 │ ├─StreamExchange { dist: HashShard(stock.s_i_id) }
                 │ │ └─StreamTableScan { table: stock, columns: [s_i_id, s_w_id, s_quantity] }
                 │ └─StreamExchange { dist: HashShard(order_line.ol_i_id) }
                 │   └─StreamProject { exprs: [order_line.ol_i_id, order_line.ol_quantity, order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number] }
                 │     └─StreamFilter { predicate: (order_line.ol_delivery_d > '2010-05-23 12:00:00':Timestamp) }
                 │       └─StreamTableScan { table: order_line, columns: [ol_i_id, ol_quantity, ol_w_id, ol_d_id, ol_o_id, ol_number, ol_delivery_d] }
                 └─StreamExchange { dist: HashShard(item.i_id) }
                   └─StreamProject { exprs: [item.i_id] }
                     └─StreamFilter { predicate: Not(Like(item.i_data, 'co%':Varchar)) }
                       └─StreamTableScan { table: item, columns: [i_id, i_data] }

Notice the StreamHashJoin with ❗️❗️❗️, the join condition is stock.s_i_id = order_line.ol_i_id

However, the stock table is very large (around 140,000,000 rows), but with only 100,000 distinct value of s_i_id, so each new line in order_line will be amplified 1400x

image image

fuyufjh avatar Dec 07 '23 10:12 fuyufjh

so each new line in order_line will be amplified 1400x

Is big amplification supposed to be solved by spill anytime https://github.com/risingwavelabs/risingwave/pull/12028?

lmatz avatar Jan 04 '24 06:01 lmatz

so each new line in order_line will be amplified 1400x

Is big amplification supposed to be solved by spill anytime #12028?

Partially. The big amplification may cause multiple problems. Even regarding of OOM, there are more than one problem (data in mem-table, data in the shuffle channel, data in streaming operator, etc.). But AFAIK the major one is caused by memtable, thus, can be solved by spill anytime.

fuyufjh avatar Jan 04 '24 06:01 fuyufjh

let me run the same setting again with nightly-20240102: https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/150#018cd321-b085-4136-ac9c-4327b8feb534

lmatz avatar Jan 04 '24 06:01 lmatz

It OOMed: https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/150#018cd321-b085-4136-ac9c-4327b8feb534

https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&from=1704349538555&to=1704350593203&var-datasource=P2453400D1763B4D9&var-namespace=ch-benchmark-pg-cdc-20240104-062245&var-instance=benchmark-risingwave&var-pod=All&var-component=All&var-table=All SCR-20240104-ksk

However, I didn't find the heap dumps on s3 while the other two executions of the same pipeline today do exist.

SCR-20240104-knl SCR-20240104-knz SCR-20240104-kux

All 3CNs OOMed at the same time. This is all-in-one-machine deployement.

@huangjw806 is helping to check if the entire machine runs out of memory and thus kills the CNs.

lmatz avatar Jan 04 '24 07:01 lmatz

However, I didn't find the heap dumps on s3 while the other two executions of the same pipeline today do exist.

You probably need:

ENABLE_MEMORY_PROFILING="true"

This is sometimes annoying...

fuyufjh avatar Jan 04 '24 07:01 fuyufjh

let me make it true as default because we no longer do sampling.

huangjw806 is helping to check if the entire machine runs out of memory and thus kills the CNs. And the entire machine's memory is enough for the workload: https://grafana.test.risingwave-cloud.xyz/d/LDL_mx9nz123/kubernetes-node-exporter?orgId=1&var-DS_PROMETHEUS=P2453400D1763B4D9&var-node=ip-10-0-41-244.ec2.internal&var-internal_ip=10.0.41.244&var-diskdevices=%5Ba-z%5D+%7Cnvme%5B0-9%5D+n%5B0-9%5D+%7Cmmcblk%5B0-9%5D+&from=1704346828944&to=1704352403138

SCR-20240104-l5g

lmatz avatar Jan 04 '24 07:01 lmatz

The memory usage of the 3 CNs look similar to the one in https://github.com/risingwavelabs/risingwave/issues/14251

lmatz avatar Jan 04 '24 07:01 lmatz

https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc/builds/152#018cd34b-eaf3-4148-a74c-4da996384253:

SCR-20240104-lxr SCR-20240104-ly3 SCR-20240104-ly9

collapsed.zip

lmatz avatar Jan 04 '24 07:01 lmatz

I just analyzed Run 149 (namespace: ch-benchmark-pg-cdc-20240104-061254) and the conclusion is same as above. The prost message decoding used more than 6GB memory. 1704349111-2024-01-04-06-18-30.auto.heap.collapsed.zip

fuyufjh avatar Jan 04 '24 08:01 fuyufjh

image

the message of the BarrierMutation consume 6GB

st1page avatar Jan 04 '24 08:01 st1page

image

message AddMutation {
  // New dispatchers for each actor.
  map<uint32, Dispatchers> actor_dispatchers = 1;
  // All actors to be added (to the main connected component of the graph) in this update.
  repeated uint32 added_actors = 3;
  // We may embed a source change split mutation here.
  // TODO: we may allow multiple mutations in a single barrier.
  map<uint32, source.ConnectorSplits> actor_splits = 2;
  // We may embed a pause mutation here.
  // TODO: we may allow multiple mutations in a single barrier.
  bool pause = 4;
}

message Dispatchers {
  repeated Dispatcher dispatchers = 1;
}

message Dispatcher {
  DispatcherType type = 1;
  // Indices of the columns to be used for hashing.
  // For dispatcher types other than HASH, this is ignored.
  repeated uint32 dist_key_indices = 2;
  // Indices of the columns to output.
  // In most cases, this contains all columns in the input. But for some cases like MV on MV or
  // schema change, we may only output a subset of the columns.
  repeated uint32 output_indices = 6;
  // The hash mapping for consistent hash.
  // For dispatcher types other than HASH, this is ignored.
  ActorMapping hash_mapping = 3;
  // Dispatcher can be uniquely identified by a combination of actor id and dispatcher id.
  // This is exactly the same as its downstream fragment id.
  uint64 dispatcher_id = 4;
  // Number of downstreams decides how many endpoints a dispatcher should dispatch.
  repeated uint32 downstream_actor_id = 5;
}

std::mem::size_of::<risingwave_pb::stream_plan::Dispatcher>() = 136

parallelism is 96 and there are only 3CN? there will be 96/3 = 32 AddMutation messages in each CN. each AddMutation message will have totall actors number elements in the hashMap actor_dispatchers which is 96(paralellism)*10(fragment num) each Dispatchers will have 96 elements(Dispatcher). 136Byte * 96 * 96 * 10 * 32 = 382.5MB there is still about 10x amplification in somewhere. Is there any case that there can be more than one DDL running concurrently in the pipeline?

st1page avatar Jan 04 '24 08:01 st1page

there is still about 10x amplification in somewhere. Is there any case that there can be more than one DDL running concurrently in the pipeline?

This is a direct CDC testing pipeline, does it matter?

also cc: @cyliu0 for the DDL question

lmatz avatar Jan 04 '24 08:01 lmatz

parallelism is 96 and there are only 3CN? there will be 96/3 = 32 AddMutation messages in each CN. each AddMutation message will have totall actors number elements in the hashMap actor_dispatchers which is 96(paralellism)*10(fragment num) each Dispatchers will have 96 elements(Dispatcher). 136Byte * 96 * 96 * 10 * 32 = 382.5MB there is still about 10x amplification in somewhere. Is there any case that there can be more than one DDL running concurrently in the pipeline?

Oh there are 12 tables in the ch benchmark so

there will be 96/3 = 32 AddMutation messages in each CN.

should be 12(sources num) * 96 (total parallelism) / 3 (CN num) And then the memory consumption is 382.5MB * 12 = 4590MB which makes sense with the memory heap number

st1page avatar Jan 04 '24 09:01 st1page