risingwave icon indicating copy to clipboard operation
risingwave copied to clipboard

storage: high memory usage of compactor

Open skyzh opened this issue 2 years ago • 49 comments

image

In TPC-H Q12 3-node benchmarks.

skyzh avatar Jul 15 '22 07:07 skyzh

After some offline discussion with Arne, here are some ideas:

  1. Do not load all SST blocks into memory at once. We just load 1 block per SST + prefetch next few blocks in fixed-size queue. (Isn't this streaming merge already implemented? In which case, this would have wrongly identified the issue as trying to merge all SSTs at once, as we would have constant memory for fixed number of SSTs merged.).
  2. Memory-limit k-way merge sort (funnel sort?). Limit to K SSTs merged at once. Needs multiple stages where intermediate results are written back to storage.
  3. Add some "slack" to each SST so that merging into it will not result to a cascade of writes (probably does not help that much, as we could expect quite a lot of cross-SST merges)
  4. Interval scheduling of SSTs so that we do not have an all-or-nothing approach to merging if there are any non-overlapping ranges.

@ALeitert

jon-chuang avatar Jul 20 '22 07:07 jon-chuang

The ideas look good to me. cc @hzxa21

skyzh avatar Jul 20 '22 07:07 skyzh

What's "slack"? 🤔

lmatz avatar Jul 20 '22 07:07 lmatz

Well its an idea (credits to @ALeitert) where if we are merging one small SST (RHS) into a bunch of SSTs with non-overlapping ranges (LHS). If |RHS| << |LHS|, as each LHS SST is filled, the leftover values are pushed onto another SST, and this would result in a cascade of writes on the LHS. If we instead did not have full SSTs, the new writes can be absorbed by the "slack" region at the end of each LHS SST.

I guess the idea is like a B-Tree. However, I guess the reason why this may not be so effective is that RHS is a big batch, so we will likely be touching most of LHS SSTs anyway, assuming some uniform distribution of keys, and that is the point of LSM - we are batching inserts to postpone non-sequential writes to compaction stage.

jon-chuang avatar Jul 20 '22 07:07 jon-chuang

It was an idea I had when thinking about the tables and doing some test on my machine. I noticed that the compactor is often loading and rebuilding lots of tables and compacting them even if there is only a small change. And, as mentioned, a small change at the beginning, has a cascading effect to all following. Extreme case would be that we add one KV pair in the very first table. The idea is not fully developed yet, and I would also not give it priority. I also do not know how likely such cases are for real-life data.

ALeitert avatar Jul 20 '22 07:07 ALeitert

After some offline discussion with Arne, here are some ideas:

  1. Do not load all SST blocks into memory at once. We just load 1 block per SST + prefetch next few blocks in fixed-size queue. (Isn't this streaming merge already implemented? In which case, this would have wrongly identified the issue as trying to merge all SSTs at once, as we would have constant memory for fixed number of SSTs merged.).
  2. Memory-limit k-way merge sort (funnel sort?). Limit to K SSTs merged at once. Needs multiple stages where intermediate results are written back to storage.
  3. Add some "slack" to each SST so that merging into it will not result to a cascade of writes (probably does not help that much, as we could expect quite a lot of cross-SST merges)
  4. Interval scheduling of SSTs so that we do not have an all-or-nothing approach to merging if there are any non-overlapping ranges.

@ALeitert

  1. sounds good to me. For 2)-4), I am not sure how much we can get with the added complexity so we need to bench first.

In most cases, we use dedicated compactor nodes for compaction so I think we can assume there is no memory resource competition between compactor and other components. The goal of bounding compactor memory consumption is to avoid OOM. Ideally, we should bound the working set of compactor to avoid OOM but try to prefetch as much as possible.

hzxa21 avatar Jul 20 '22 09:07 hzxa21

In terms of implementation complexity, @ALeitert points out that 4. it is as simple as organizing the SSTables to be compacted in a given task into groups of non-overlapping ranges. Then, we merge over each of these ConcatIterators (which lazily load SSTables into memory) in the MergeIterator.

If we have a small number of groups relative to total number of tables being merged, we would be successful in reducing the memory usage at any given point in time while improving our yield for large number of SSTables that have been merged into a collection of non-overlapping SSTs.

So I would disagree about added complexity for 4.


However, we do need to validate if this helps our workloads. If our workload looks like merging a large number of non-overlapping SSTs with a small number of overlapping SSTs, then it will help. If it looks like merging a large number of overlapping ranges, then it will not help, and we need to rely on:

  1. Idea 1 (stream blocks for SSTableIterator instead of loading them eagerly)
  2. Limiting number of SSTs we merge at once
  3. Reduce parallelism for compaction (not ideal?)

Arne is working on validating what kind of workload we have by measuring whether interval scheduling can result in small number of groups relative to number of sstables in the task and whether the maximum memory used for storing SSTables in a task at any point of time is reduced.


Generally, I think that 2 and 3 are poor compromises. We want a higher yield (large collections of SSTs with non-overlapping ranges per second) for fixed amount of memory. 2 results in same number of SSTs yielded, but results in more than one collection of non-overlapping SSTs. 3 results in fewer SSTs yielded per second.


As for 1. I'm not sure how good of an idea it is, as we can save costs from fetching a large contiguous SST from memory (single S3 get request). So I would be hesistant about the prefetch/stream approach.

Edit: this is erroneous. We can issue a single get from S3 and stream the resultant bytes. HTTP handles flow control so we only pull data as needed to be buffered for consumption.


  1. The final idea that has not been covered so far is improving the way in which scheduling is performed. To my understanding, one possible ideal scenario is:

A small number N of SSTable with large range (RHS) are assigned to one task, to be merged with a large number M of non-overlapping SSTables each with small range (LHS). Then, the memory usage is N + 1 which is << N + M.

This is complementary to idea 4. We require idea 4 to make this idea work. This idea will make our ability to apply idea 4 more likely.

5 can be implemented by performing interval scheduling at the meta node. We add groups to a task randomly to have a more uniform workload across tasks (tasks have a balance of groups with many SSTs and few SSTs, and those in between).

We add groups up to a max_overlapping_threshold.


So we should investigate as:

  • 4 (interval scheduling at task level + lazy loading for each group of non-overlapping SSTs) + final idea 5 (interval scheduling at meta level)
  • 1 (lazy loading of blocks on the SST level + prefetching)

jon-chuang avatar Jul 20 '22 09:07 jon-chuang

We check the compactor log and find out that the compaction task itself is not huge but the compactor node is running 6-7 compaction tasks concurrently. I think we should limit the number of in-flight compaction tasks per compactor node first.

hzxa21 avatar Jul 20 '22 10:07 hzxa21

compaction task itself is not huge but the compactor node is running 6-7 compaction tasks concurrently.

But I think in the approaches above, we are trying to make each compaction task itself more efficient, allowing for more compacition tasks to be run concurrently on the same memory budget, resulting in higher SST yield.

Reducing number of tasks can prevent OOM but reduces yield.

compaction task itself is not huge

Not huge does not mean not higher than it needs to be.

jon-chuang avatar Jul 20 '22 10:07 jon-chuang

@ALeitert points out that 4. it is as simple as organizing the SSTables to be compacted in a given task into groups of non-overlapping ranges. Then, we merge over each of these ConcatIterators (which lazily load SSTables into memory) in the MergeIterator.

Only Level0 has overlapping SSTs in our design so we should always use ConcatIterators for the compaction input SSTs from L1-N. IIUC, this proposal is trying to divide n L0 SSTs into k groups with each group containing non-overlapping SSTs so that we can have a MergeIterator on top of k ConcatIterator instead of a MergeIterator on top of n SSTableIterator. This is a valid optimization and worth trying. My guess is we won't benefit much from it because 1) it only applies to L0->L1 compaction; 2) SSTs in L0 normally has a very wide key range.

FYI, Sub-level introduced in #3111 can also help to make compaction more efficient by reudcing overlapping SSTs in a compaction task.

Back to the problem described in this issue, I still think controlling compaction parallelism (i.e. number of running compaction taska) in a compactor node is a simpler and more effective solution because 1) compaction task is preferred to be kept small in many cases; 2) the root cause of this issue is due to too many not-too-big compaction tasks running in one compactor node at the same time. We should try to scale compactor nodes if all availale compactor nodes hit the per node compaction parallelism. limit

hzxa21 avatar Jul 20 '22 13:07 hzxa21

Interval scheduling of SSTs so that we do not have an all-or-nothing approach to merging if there are any non-overlapping ranges.

@jon-chuang I have implement it called trivial move in https://github.com/singularity-data/risingwave/pull/3111/files . If some SST does not overlap with anyone in the target level, it would be moved to the position in target level without any compaction task in compactor.

Little-Wallace avatar Jul 20 '22 15:07 Little-Wallace

root cause of this issue is due to too many not-too-big compaction tasks running in one compactor node at the same time. We should try to scale compactor nodes if all availale compactor nodes hit the per node compaction parallelism. limit

I think rather than limit parallelism per se, we may be better off limiting the expected memory consumption. This way, we can set the target memory usage based on node's memory, whether total or available (where for the latter it is assumed we are sharing the node with other processes).

We can assume that N SSTs from L0 in the task results in N * L0 size bytes consumed, and any SST from L1 and above results in 1 * L? size bytes consumed.

Since we store the resultant SST in memory until upload, we can also add the target level's size bytes to the budget consumed.

Or, I guess, parallelism might be fixed (by number of cores) and we adjust the max number of L0 SSTs we try to merge in one task according to target_memory_usage / parallelism

jon-chuang avatar Jul 20 '22 17:07 jon-chuang

Indeed, 1 would only benefit L0->L0 or L0->L1 compaction involving more than one SST from L0. I guess to improve things in light of sub-level compaction, we should try to find concat iterators in each non-overlapping sub-level. So that only highest L0 sub level will need an SSTableIterator

jon-chuang avatar Jul 21 '22 01:07 jon-chuang

Actually, considering that currently, the largest SST file can be 2GB, streaming SSTable iter approach does seem reasonable. Perhaps, fetching blocks up to a buffer of 32MBs makes sense.

However, note that those would be from levels with non-overlapping SST files. So we would have at most one large SST at a time. On the other hand, we store the SSTs resulting from compaction in memory for the duration of creation and upload as well.

Since the target level is >= any of the inputs, the memory usage from keeping these files in memory instead of a streaming upload is already at least the size of largest input SST file. In other words, we may not be able to significantly reduce memory usage unless we also allow streaming upload of blocks of an SST prior to it being sealed...

Details:

S3 sdk requires a ByteStream implementing object, not necessarily Bytes like we provide now. So if we change the object store interface to accept some sort of byte stream instead of bytes, perhaps we can achieve a streaming upload. Specifically, in the S3 case, the way to achieve this would be to have smithy::ByteStream::from(SdkBody::from(hyper::Body::channel().0) as the upload parameter.

In our case, our object store interface could have an upload_stream(path, byte_stream) interface where byte_stream is a generic bytestream implementing Stream trait. The s3 object store would take data from this bytestream and dump into the hyper byte stream (or simply use a hyper::Body's wrap_stream interface). However, we should investigate Hyper's stream buffer size.

Likewise, object store should have a read_stream interface that allows reading dyn Stream object. Then, we can have a BlockStream and SstableStream objects that stream the bytes and produces the next block by calling next().await. etc.

jon-chuang avatar Jul 21 '22 05:07 jon-chuang

In the origin implement we will read only one block and hold it in block-cache it means that we can control the memory of compactor precisely. But there is a problem that we must pay AWS per IO requests. If one object is 32MB and the size of each of block is 64KB, it means we must pay 500 times money than main branch.

Little-Wallace avatar Jul 21 '22 11:07 Little-Wallace

Actually, considering that currently, the largest SST file can be 2GB, streaming SSTable iter approach does seem reasonable. Perhaps, fetching blocks up to a buffer of 32MBs makes sense.

Exactly it could only reach 256MB and I will change it to 128MB. So I do not think it is the most important thing to reduce the memory usage for us. We only need to control the memory not exceed the upper limit and that is enough. The cost of IO for S3 is much expensive.

Little-Wallace avatar Jul 21 '22 11:07 Little-Wallace

🤔 In fact, I should prioritize identifying the various components of the memory occupy.

Little-Wallace avatar Jul 21 '22 13:07 Little-Wallace

Actually, considering that currently, the largest SST file can be 2GB, streaming SSTable iter approach does seem reasonable. Perhaps, fetching blocks up to a buffer of 32MBs makes sense.

However, note that those would be from levels with non-overlapping SST files. So we would have at most one large SST at a time. On the other hand, we store the SSTs resulting from compaction in memory for the duration of creation and upload as well.

Since the target level is >= any of the inputs, the memory usage from keeping these files in memory instead of a streaming upload is already at least the size of largest input SST file. In other words, we may not be able to significantly reduce memory usage unless we also allow streaming upload of blocks of an SST prior to it being sealed...

Details:

S3 sdk requires a ByteStream implementing object, not necessarily Bytes like we provide now. So if we change the object store interface to accept some sort of byte stream instead of bytes, perhaps we can achieve a streaming upload. Specifically, in the S3 case, the way to achieve this would be to have smithy::ByteStream::from(SdkBody::from(hyper::Body::channel().0) as the upload parameter.

In our case, our object store interface could have an upload_stream(path, byte_stream) interface where byte_stream is a generic bytestream implementing Stream trait. The s3 object store would take data from this bytestream and dump into the hyper byte stream (or simply use a hyper::Body's wrap_stream interface). However, we should investigate Hyper's stream buffer size.

Likewise, object store should have a read_stream interface that allows reading dyn Stream object. Then, we can have a BlockStream and SstableStream objects that stream the bytes and produces the next block by calling next().await. etc.

@wenym1 has investigated streaming upload to S3 (see #1368) before. In short, it is not possible to achieve streaming upload via providing ByteStream to s3 sdk because s3 only supports http1. Yiming have plan to investigate multi-part upload in this Q and we will see how much it can help.

We only need to control the memory not exceed the upper limit and that is enough

+1

hzxa21 avatar Jul 21 '22 14:07 hzxa21

In the origin implement we will read only one block and hold it in block-cache it means that we can control the memory of compactor precisely. But there is a problem that we must pay AWS per IO requests. If one object is 32MB and the size of each of block is 64KB, it means we must pay 500 times money than main branch.

Well, I was thinking that single get request would be streamed over TCP which has flow control, allowing us to control application buffer size and hence memory usage.

jon-chuang avatar Jul 21 '22 14:07 jon-chuang

Exactly it could only reach 256MB and I will change it to 128MB.

Oh, hold on, I thought that L0 file size is

const DEFAULT_TARGET_FILE_SIZE_BASE: u64 = 32 * 1024 * 1024; // 32MB

While due to

input.target_file_size = self.config.target_file_size_base
    << (input.target_level.level_idx as usize - base_level);

L6 file size is 32MB * 2^6 = 2GB?

jon-chuang avatar Jul 21 '22 14:07 jon-chuang

We only need to control the memory not exceed the upper limit and that is enough.

If we can achieve stream without additional S3 get request, I think we get a zero-cost memory usage reduction.

But it may not be worth it if we cannot do streamed or multi-part upload to S3 as then we get at most factor of 2 reduction in memory usage, instead of possibly factor > 10.


Due to the potential to achieve quite a good memory efficiency, allowing us to potentially run a lot more compaction tasks on a single node (up to limit imposed by compute requirement of each task), or on a node with smaller amount of memory, thus saving users cost, I believe it may be worth looking into continuing investigation of https://github.com/singularity-data/risingwave/issues/1368 via S3's multipart upload API.

Currently, the Rust SDK actually already seems to support this: https://github.com/awslabs/aws-sdk-rust/issues/494.

Btw, to my understanding, there is no cost to multiple uploads to S3, multipart upload API or otherwise. (https://aws.amazon.com/s3/pricing/)

jon-chuang avatar Jul 21 '22 14:07 jon-chuang

S3 sdk requires a ByteStream implementing object, not necessarily Bytes like we provide now. So if we change the object store interface to accept some sort of byte stream instead of bytes, perhaps we can achieve a streaming upload. Specifically, in the S3 case, the way to achieve this would be to have smithy::ByteStream::from(SdkBody::from(hyper::Body::channel().0) as the upload parameter.

This requires us to provide body size in advance. So we still need to generate the full SST before uploading.

skyzh avatar Jul 21 '22 14:07 skyzh

This requires us to provide body size in advance. So we still need to generate the full SST before uploading.

Yes, so I'm suggesting to use multipart upload API.

jon-chuang avatar Jul 21 '22 15:07 jon-chuang

Yes, so I'm suggesting to use multipart upload API.

Sure I think we can do multipart upload.

I had some rough idea before about providing a new upload API for object store to accept a stream like input and upload the whole SST portion by portion (ideally block by block). The similar idea will also be helpful to save memory for spill to disk, since a file handle can write like a stream instead of writing the whole object.

I plan to support this feature soon after I do some experiment about multipart upload on both S3 and MinIO. Besides, it seems that there is a limit on the minimum part size for multipart upload, so we need to handle the upload in a tricky way.

wenym1 avatar Jul 21 '22 15:07 wenym1

so we need to handle the upload in a tricky way.

I guess one requirement is that parts are uploaded sequentially after previous one is complete.

I think from our perspective, object store interface implemented for S3 should pull bytes from input stream. If input stream not exhausted, add to buffer of fixed size (min multipart part size, or first multiple of block size greater than that). Once full, do upload of part. Else, if stream is exhausted, just upload remainder. So its probably not too tricky.

jon-chuang avatar Jul 21 '22 15:07 jon-chuang

L6 file size is 32MB * 2^6 = 2GB?

No. in src/storage/src/hummock/compactor.rs we can limit it by

        let target_file_size = std::cmp::min(
            self.compact_task.target_file_size as usize,
            max_target_file_size,
        );

Little-Wallace avatar Jul 21 '22 15:07 Little-Wallace

So I do not support uploading data by streaming API because it is not necessary. Hummock is a LSM database engine instead of FileSystem. We can limit file size no more than 100MB. And according suggestion from AWS, we only need multipart uploading for large file more than 100MB.

Little-Wallace avatar Jul 21 '22 15:07 Little-Wallace

I think from our perspective, object store interface implemented for S3 should pull bytes from input stream. If input stream not exhausted, add to buffer of fixed size (min multipart part size, or first multiple of block size greater than that). Once full, do upload of part. Else, if stream is exhausted, just upload remainder. So its probably not too tricky.

It's not as simple as this solution.

When we upload the remainder, the remainder is also a part, but we cannot ensure that this part is greater than the minimum part size, which may cause the last upload part to fail (not sure about this yet, we may need some experiment on S3 and MinIO). A solution to this is, we always ensure the multipart upload buffer to have more than one part size data, and we will need a buffer of size of two times the minimum part size. When the buffer size exceeds two times the minimum part size, we upload the first minimum part size, and the remaining buffer size is still greater than the minimum part size. In this way, we can make sure the last part can be greater than the minimum part size. If the whole data is smaller than the minimum part size, just do a simple upload.

wenym1 avatar Jul 21 '22 16:07 wenym1

So I do not support uploading data by streaming API because it is not necessary.

Indeed, if we limit the size to ~100 MB, there is a reduced incentive to do streaming download or multipart upload, at least with saving memory as a motivation.

However, I guess we don't want to have too small file sizes, then we have to do more get requests for both metadata and data and maintain more metadata.

Further, there are other advantages to streaming:

  1. Performance - According to https://singularity-data.quip.com/RalDAVhdy5EI/S3-performance#eCFACA0rGWu, download of 128MB file takes on average 1.6s. Upload 128MB takes 1.28s, and 0.69s for multipart. note: If task and compaction share same thread, streaming will not improve performance. We need concurrent download, compaction and upload to achieve better per-task completion time.

If we can reduce memory cost significantly for 128MB * 10 input files, we can save ~ 1GB per task. So even with reduced incentive, there may be a strong incentive.


In terms of prioritizing streaming download v.s. streaming upload, given that when file size is large (~100MB), input file size == output file size, it may be worth spending more time optimizing the download rather than upload. Since for level multiplier = 10, input SSTs would be 10x output, the effectiveness of streaming the download is 10x of streaming the upload. So perhaps we should deprioritize streaming upload as it is also significantly more complicated (including storing output in block cache for local compaction).

jon-chuang avatar Jul 22 '22 02:07 jon-chuang

@jon-chuang I do not understand why you think the speed of download could be improved by 10x by streaming

Little-Wallace avatar Jul 22 '22 02:07 Little-Wallace