risingwave icon indicating copy to clipboard operation
risingwave copied to clipboard

feat(sst): support streaming upload for sst

Open Gun9niR opened this issue 2 years ago • 11 comments

I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.

What's changed and what's your intention?

The PR implements streaming upload for SST, which utilizes the multipart upload API of S3. We believe that this functionality can accelerate the uploading with parallelism.

  • Refactor
    • Add SstableWriter as a hook into SstableBuilder to consume SST block data
    • Add SstableWriterBuilder, SstableBuilderSealer as two hooks into CapacitySplitTableBuilder to decouple SST building and uploading.
  • Streaming Upload
    • Add streaming upload interfaces for ObjectStore and SstableStore. Starting a streaming upload for an SST will return an uploader, on which upload_block & upload_size_footer can be called. The uploader can be passed back to SstableStore to finish the upload.
    • For S3 implementation of streaming upload, part size is set to 16MiB based on the benchmark results. If the object is smaller than the minimum part size requirement, we will fall back to normal put.
      • Some trivial micro-benches (not included in the PR because they are very casually written) reveal that the streaming upload implementation is slower when the object size is small (< 5MiB), and will be around 1.5 - 2x faster for large objects (> 50 MiB). We throw away uploaded data, so memory consumption is also better.

Checklist

  • [x] I have written necessary rustdoc comments
  • [x] I have added necessary unit tests and integration tests
  • [x] All checks passed in ./risedev check (or alias, ./risedev c)

Documentation

If your pull request contains user-facing changes, please specify the types of the changes, and create a release note. Otherwise, please feel free to remove this section.

Types of user-facing changes

Please keep the types that apply to your changes, and remove those that do not apply.

  • Installation and deployment
  • Connector (sources & sinks)
  • SQL commands, functions, and operators
  • RisingWave cluster configuration changes
  • Other (please specify in the release note below)

Release note

Please create a release note for your changes. In the release note, focus on the impact on users, and mention the environment or conditions where the impact may occur.

Refer to a related PR or issue link (optional)

#1368

Gun9niR avatar Aug 04 '22 15:08 Gun9niR

Codecov Report

Merging #4454 (530dfd8) into main (14b8a0f) will decrease coverage by 0.19%. The diff coverage is 78.85%.

@@            Coverage Diff             @@
##             main    #4454      +/-   ##
==========================================
- Coverage   74.36%   74.16%   -0.20%     
==========================================
  Files         853      855       +2     
  Lines      126671   127839    +1168     
==========================================
+ Hits        94198    94813     +615     
- Misses      32473    33026     +553     
Flag Coverage Δ
rust 74.16% <78.85%> (-0.20%) :arrow_down:

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
src/common/common_service/src/observer_manager.rs 0.00% <0.00%> (ø)
src/common/src/monitor/mod.rs 0.00% <0.00%> (ø)
...c/compute/src/compute_observer/observer_manager.rs 0.00% <0.00%> (ø)
src/compute/src/lib.rs 3.03% <0.00%> (-0.10%) :arrow_down:
src/compute/src/server.rs 0.00% <0.00%> (ø)
src/ctl/src/cmd_impl/hummock/sst_dump.rs 0.00% <0.00%> (ø)
src/ctl/src/cmd_impl/stream/trace.rs 0.00% <0.00%> (ø)
src/ctl/src/cmd_impl/table/scan.rs 0.00% <0.00%> (ø)
src/ctl/src/common/hummock_service.rs 0.00% <0.00%> (ø)
src/expr/src/expr/expr_binary_nonnull.rs 84.00% <ø> (ø)
... and 146 more

:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more

codecov[bot] avatar Aug 04 '22 16:08 codecov[bot]

Instead of exposing as multipart upload, I will prefer to expose it as streaming_upload with write_bytes and finish method. The multipart upload implementation detail will be hidden inside the streaming uploader returned from S3 object store. In case that some object stores do not support multi part upload, they can still implement streaming upload by saving all the written data in a buffer and upload all at once when finished.

wenym1 avatar Aug 05 '22 07:08 wenym1

Instead of exposing as multipart upload, I will prefer to expose it as streaming_upload with write_bytes and finish method. The multipart upload implementation detail will be hidden inside the streaming uploader returned from S3 object store. In case that some object stores do not support multi part upload, they can still implement streaming upload by saving all the written data in a buffer and upload all at once when finished.

I think we should distinguish the multipart upload in ObjectStore's interface from S3's multipart upload. In our code, multipart upload simply means that "you can upload an object part by part and it will finally appear to be a single object".(which is pretty interchangeable with the streaming_upload that you mention, since it might not be really "streaming"). It doesn't bear any indication of how it is implemented, be it with streaming upload, appending to buffer, linking inconsecutive data blocks, etc. Essentially, multipart upload here is a broader concept than streaming upload, and I think it's really just a matter of naming.

I borrowed the part id concept from S3 so that data don't have to come in order. If we want to support ordered upload, we can define another trait and return a dedicated ordered upload handle.

I understand that this PR's about streaming, so we much emphasize the concept of streaming. But I think the ObjectStore layer should still use a more general and flexible abstraction, and it would be more appropriate to adpot the concept of "streaming" from SstableStore and above.

Gun9niR avatar Aug 06 '22 04:08 Gun9niR

Instead of exposing as multipart upload, I will prefer to expose it as streaming_upload with write_bytes and finish method. The multipart upload implementation detail will be hidden inside the streaming uploader returned from S3 object store. In case that some object stores do not support multi part upload, they can still implement streaming upload by saving all the written data in a buffer and upload all at once when finished.

I think we should distinguish the multipart upload in ObjectStore's interface from S3's multipart upload. In our code, multipart upload simply means that "you can upload an object part by part and it will finally appear to be a single object".(which is pretty interchangeable with the streaming_upload that you mention, since it might not be really "streaming"). It doesn't bear any indication of how it is implemented, be it with streaming upload, appending to buffer, linking inconsecutive data blocks, etc. Essentially, multipart upload here is a broader concept than streaming upload, and I think it's really just a matter of naming.

I borrowed the part id concept from S3 so that data don't have to come in order. If we want to support ordered upload, we can define another trait and return a dedicated ordered upload handle.

I understand that this PR's about streaming, so we much emphasize the concept of streaming. But I think the ObjectStore layer should still use a more general and flexible abstraction, and it would be more appropriate to adpot the concept of "streaming" from SstableStore and above.

I think a streaming upload interface is more generic than the part-based upload interface. The reasons are:

  • We will have at least two upload interfaces in ObjectStore: one for part-based upload, one for full object upload. However, we can unify the upload interfaces if ObjectStore provides a streaming upload interface.
  • If the underlying object store doesn't support part-based upload, the part-based upload interface cannot be used. That means a layer above ObjectStore (e.g. SstableStore) needs to understand the type of ObjectStore and has a switch in its streaming upload implementation to decide which upload interface to use. This leaks the implementation detail of ObjectStore and makes the codes more complex.

hzxa21 avatar Aug 07 '22 15:08 hzxa21

It doesn't bear any indication of how it is implemented

Just to be clear, we do not make any attempt to chunk our multipart upload into specific size/chunks, for instance at block boundary, correct? In that case, I agree a streaming upload interface seems more generic.

I think multipart is simply one implementation of that (specific to S3's interface).

And even if we want to align the part size to some chunk size or boundary, the S3 implementation can take care of that (perhaps with additional config).

jon-chuang avatar Aug 08 '22 03:08 jon-chuang

I think we should distinguish the multipart upload in ObjectStore's interface from S3's multipart upload. In our code, multipart upload simply means that "you can upload an object part by part and it will finally appear to be a single object".(which is pretty interchangeable with the streaming_upload that you mention, since it might not be really "streaming"). It doesn't bear any indication of how it is implemented, be it with streaming upload, appending to buffer, linking inconsecutive data blocks, etc. Essentially, multipart upload here is a broader concept than streaming upload, and I think it's really just a matter of naming.

It's not just a matter of naming. Uploading object part by part and streaming upload differs in that uploading object part by part does not have to ensure that the uploaded part number is sent in order, which means the data is received with no order and we may randomly write any part of a object, while streaming_upload indicates that the data is received in order and we only need to append new data to the previously uploaded data.

Currently all our related potential use cases are streaming_upload only, and exposing the more generic uploading object part by part abstraction for ObjectStore will make upper layer usage complicated. Besides, streaming_upload will ease the implementation of InMemoryObjectStore and DiskObjectStore, since they can just keep appending newly added data. The knowledge of streaming_upload is more essential for DiskObjectStore, since for uploading data part by part, the offset of each part is unknown until we call finish and sort all parts by their part numbers, and before knowing the offset of each part, we cannot even pre-write the data to disk with pwrite, which means we have to buffer all the data in memory until we call finish. Instead, to support streaming_upload, we can call file.write whenever we receive new data, and we can save memory usage with this, and this is append-only write and we may have better write performance. So considering all mentioned above, it's better to expose streaming_upload abstraction in object store and it's unnecessary to expose it as uploading data part by part.

But we can still expose multi_part_upload as a special functionality of S3ObjectStore. Since the ObjectStoreImpl we passed in is an enum, we are able to check whether its implementation is S3ObjectStore, and use such special multi_part upload functionality for S3ObjectStore in some special use case.

wenym1 avatar Aug 08 '22 03:08 wenym1

Just to be clear, we do not make any attempt to chunk our multipart upload into specific size/chunks, for instance at block boundary, correct? In that case, I agree a streaming upload interface seems more generic.

We do. According to the benchmark, it's better to upload the SSTs in 8M/16M parts and align each part to the beginning of the block. But no matter which interface we provide, this should be handled by upper-level code.

Seems I have put too much weight on generality by supporting out-of-order uploading. I'm switching to streaming upload interface. But how can we unify the full-upload and streaming-upload interfaces, as @hzxa21 mentioned?

Gun9niR avatar Aug 08 '22 07:08 Gun9niR

I guess out of order would support parallel upload? I guess for now we don't need to support it, as parallelism could be achieved within a task via "splits" already.

jon-chuang avatar Aug 08 '22 09:08 jon-chuang

I guess out of order would support parallel upload? I guess for now we don't need to support it, as parallelism could be achieved within a task via "splits" already.

What "split"?

Gun9niR avatar Aug 08 '22 09:08 Gun9niR

What "split"?

It’s just multiple ranges within the same compaction task.

But how can we unify the full-upload and streaming-upload interfaces, as @hzxa21 mentioned?

I believe his point is actually that stream upload is the interface that meets our requirements. Meaning “full-upload” (out of order) is not required.

jon-chuang avatar Aug 08 '22 19:08 jon-chuang

CLA assistant check
All committers have signed the CLA.

CLAassistant avatar Aug 11 '22 04:08 CLAassistant