risingwave
risingwave copied to clipboard
feat(sst): support streaming upload for sst
I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.
What's changed and what's your intention?
The PR implements streaming upload for SST, which utilizes the multipart upload API of S3. We believe that this functionality can accelerate the uploading with parallelism.
- Refactor
- Add
SstableWriteras a hook intoSstableBuilderto consume SST block data - Add
SstableWriterBuilder,SstableBuilderSealeras two hooks intoCapacitySplitTableBuilderto decouple SST building and uploading.
- Add
- Streaming Upload
- Add streaming upload interfaces for
ObjectStoreandSstableStore. Starting a streaming upload for an SST will return an uploader, on whichupload_block&upload_size_footercan be called. The uploader can be passed back toSstableStoreto finish the upload. - For S3 implementation of streaming upload, part size is set to
16MiBbased on the benchmark results. If the object is smaller than the minimum part size requirement, we will fall back to normalput.- Some trivial micro-benches (not included in the PR because they are very casually written) reveal that the streaming upload implementation is slower when the object size is small (< 5MiB), and will be around 1.5 - 2x faster for large objects (> 50 MiB). We throw away uploaded data, so memory consumption is also better.
- Add streaming upload interfaces for
Checklist
- [x] I have written necessary rustdoc comments
- [x] I have added necessary unit tests and integration tests
- [x] All checks passed in
./risedev check(or alias,./risedev c)
Documentation
If your pull request contains user-facing changes, please specify the types of the changes, and create a release note. Otherwise, please feel free to remove this section.
Types of user-facing changes
Please keep the types that apply to your changes, and remove those that do not apply.
- Installation and deployment
- Connector (sources & sinks)
- SQL commands, functions, and operators
- RisingWave cluster configuration changes
- Other (please specify in the release note below)
Release note
Please create a release note for your changes. In the release note, focus on the impact on users, and mention the environment or conditions where the impact may occur.
Refer to a related PR or issue link (optional)
#1368
Codecov Report
Merging #4454 (530dfd8) into main (14b8a0f) will decrease coverage by
0.19%. The diff coverage is78.85%.
@@ Coverage Diff @@
## main #4454 +/- ##
==========================================
- Coverage 74.36% 74.16% -0.20%
==========================================
Files 853 855 +2
Lines 126671 127839 +1168
==========================================
+ Hits 94198 94813 +615
- Misses 32473 33026 +553
| Flag | Coverage Δ | |
|---|---|---|
| rust | 74.16% <78.85%> (-0.20%) |
:arrow_down: |
Flags with carried forward coverage won't be shown. Click here to find out more.
| Impacted Files | Coverage Δ | |
|---|---|---|
| src/common/common_service/src/observer_manager.rs | 0.00% <0.00%> (ø) |
|
| src/common/src/monitor/mod.rs | 0.00% <0.00%> (ø) |
|
| ...c/compute/src/compute_observer/observer_manager.rs | 0.00% <0.00%> (ø) |
|
| src/compute/src/lib.rs | 3.03% <0.00%> (-0.10%) |
:arrow_down: |
| src/compute/src/server.rs | 0.00% <0.00%> (ø) |
|
| src/ctl/src/cmd_impl/hummock/sst_dump.rs | 0.00% <0.00%> (ø) |
|
| src/ctl/src/cmd_impl/stream/trace.rs | 0.00% <0.00%> (ø) |
|
| src/ctl/src/cmd_impl/table/scan.rs | 0.00% <0.00%> (ø) |
|
| src/ctl/src/common/hummock_service.rs | 0.00% <0.00%> (ø) |
|
| src/expr/src/expr/expr_binary_nonnull.rs | 84.00% <ø> (ø) |
|
| ... and 146 more |
:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more
Instead of exposing as multipart upload, I will prefer to expose it as streaming_upload with write_bytes and finish method. The multipart upload implementation detail will be hidden inside the streaming uploader returned from S3 object store. In case that some object stores do not support multi part upload, they can still implement streaming upload by saving all the written data in a buffer and upload all at once when finished.
Instead of exposing as
multipart upload, I will prefer to expose it asstreaming_uploadwithwrite_bytesandfinishmethod. The multipart upload implementation detail will be hidden inside the streaming uploader returned from S3 object store. In case that some object stores do not support multi part upload, they can still implement streaming upload by saving all the written data in a buffer and upload all at once whenfinished.
I think we should distinguish the multipart upload in ObjectStore's interface from S3's multipart upload. In our code, multipart upload simply means that "you can upload an object part by part and it will finally appear to be a single object".(which is pretty interchangeable with the streaming_upload that you mention, since it might not be really "streaming"). It doesn't bear any indication of how it is implemented, be it with streaming upload, appending to buffer, linking inconsecutive data blocks, etc. Essentially, multipart upload here is a broader concept than streaming upload, and I think it's really just a matter of naming.
I borrowed the part id concept from S3 so that data don't have to come in order. If we want to support ordered upload, we can define another trait and return a dedicated ordered upload handle.
I understand that this PR's about streaming, so we much emphasize the concept of streaming. But I think the ObjectStore layer should still use a more general and flexible abstraction, and it would be more appropriate to adpot the concept of "streaming" from SstableStore and above.
Instead of exposing as
multipart upload, I will prefer to expose it asstreaming_uploadwithwrite_bytesandfinishmethod. The multipart upload implementation detail will be hidden inside the streaming uploader returned from S3 object store. In case that some object stores do not support multi part upload, they can still implement streaming upload by saving all the written data in a buffer and upload all at once whenfinished.I think we should distinguish the
multipart uploadinObjectStore's interface from S3's multipart upload. In our code, multipart upload simply means that "you can upload an object part by part and it will finally appear to be a single object".(which is pretty interchangeable with thestreaming_uploadthat you mention, since it might not be really "streaming"). It doesn't bear any indication of how it is implemented, be it with streaming upload, appending to buffer, linking inconsecutive data blocks, etc. Essentially, multipart upload here is a broader concept than streaming upload, and I think it's really just a matter of naming.I borrowed the
part idconcept from S3 so that data don't have to come in order. If we want to support ordered upload, we can define another trait and return a dedicated ordered upload handle.I understand that this PR's about streaming, so we much emphasize the concept of streaming. But I think the
ObjectStorelayer should still use a more general and flexible abstraction, and it would be more appropriate to adpot the concept of "streaming" fromSstableStoreand above.
I think a streaming upload interface is more generic than the part-based upload interface. The reasons are:
- We will have at least two upload interfaces in ObjectStore: one for part-based upload, one for full object upload. However, we can unify the upload interfaces if ObjectStore provides a streaming upload interface.
- If the underlying object store doesn't support part-based upload, the part-based upload interface cannot be used. That means a layer above ObjectStore (e.g.
SstableStore) needs to understand the type of ObjectStore and has a switch in its streaming upload implementation to decide which upload interface to use. This leaks the implementation detail of ObjectStore and makes the codes more complex.
It doesn't bear any indication of how it is implemented
Just to be clear, we do not make any attempt to chunk our multipart upload into specific size/chunks, for instance at block boundary, correct? In that case, I agree a streaming upload interface seems more generic.
I think multipart is simply one implementation of that (specific to S3's interface).
And even if we want to align the part size to some chunk size or boundary, the S3 implementation can take care of that (perhaps with additional config).
I think we should distinguish the multipart upload in ObjectStore's interface from S3's multipart upload. In our code, multipart upload simply means that "you can upload an object part by part and it will finally appear to be a single object".(which is pretty interchangeable with the streaming_upload that you mention, since it might not be really "streaming"). It doesn't bear any indication of how it is implemented, be it with streaming upload, appending to buffer, linking inconsecutive data blocks, etc. Essentially, multipart upload here is a broader concept than streaming upload, and I think it's really just a matter of naming.
It's not just a matter of naming. Uploading object part by part and streaming upload differs in that uploading object part by part does not have to ensure that the uploaded part number is sent in order, which means the data is received with no order and we may randomly write any part of a object, while streaming_upload indicates that the data is received in order and we only need to append new data to the previously uploaded data.
Currently all our related potential use cases are streaming_upload only, and exposing the more generic uploading object part by part abstraction for ObjectStore will make upper layer usage complicated. Besides, streaming_upload will ease the implementation of InMemoryObjectStore and DiskObjectStore, since they can just keep appending newly added data. The knowledge of streaming_upload is more essential for DiskObjectStore, since for uploading data part by part, the offset of each part is unknown until we call finish and sort all parts by their part numbers, and before knowing the offset of each part, we cannot even pre-write the data to disk with pwrite, which means we have to buffer all the data in memory until we call finish. Instead, to support streaming_upload, we can call file.write whenever we receive new data, and we can save memory usage with this, and this is append-only write and we may have better write performance. So considering all mentioned above, it's better to expose streaming_upload abstraction in object store and it's unnecessary to expose it as uploading data part by part.
But we can still expose multi_part_upload as a special functionality of S3ObjectStore. Since the ObjectStoreImpl we passed in is an enum, we are able to check whether its implementation is S3ObjectStore, and use such special multi_part upload functionality for S3ObjectStore in some special use case.
Just to be clear, we do not make any attempt to chunk our multipart upload into specific size/chunks, for instance at block boundary, correct? In that case, I agree a streaming upload interface seems more generic.
We do. According to the benchmark, it's better to upload the SSTs in 8M/16M parts and align each part to the beginning of the block. But no matter which interface we provide, this should be handled by upper-level code.
Seems I have put too much weight on generality by supporting out-of-order uploading. I'm switching to streaming upload interface. But how can we unify the full-upload and streaming-upload interfaces, as @hzxa21 mentioned?
I guess out of order would support parallel upload? I guess for now we don't need to support it, as parallelism could be achieved within a task via "splits" already.
I guess out of order would support parallel upload? I guess for now we don't need to support it, as parallelism could be achieved within a task via "splits" already.
What "split"?
What "split"?
It’s just multiple ranges within the same compaction task.
But how can we unify the full-upload and streaming-upload interfaces, as @hzxa21 mentioned?
I believe his point is actually that stream upload is the interface that meets our requirements. Meaning “full-upload” (out of order) is not required.