risingwave
risingwave copied to clipboard
feat(sst): support streaming upload for sst
I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.
What's changed and what's your intention?
The PR implements streaming upload for SST, which utilizes the multipart upload API of S3. We believe that this functionality can accelerate the uploading with parallelism.
- Refactor
- Add
SstableWriter
as a hook intoSstableBuilder
to consume SST block data - Add
SstableWriterBuilder
,SstableBuilderSealer
as two hooks intoCapacitySplitTableBuilder
to decouple SST building and uploading.
- Add
- Streaming Upload
- Add streaming upload interfaces for
ObjectStore
andSstableStore
. Starting a streaming upload for an SST will return an uploader, on whichupload_block
&upload_size_footer
can be called. The uploader can be passed back toSstableStore
to finish the upload. - For S3 implementation of streaming upload, part size is set to
16MiB
based on the benchmark results. If the object is smaller than the minimum part size requirement, we will fall back to normalput
.- Some trivial micro-benches (not included in the PR because they are very casually written) reveal that the streaming upload implementation is slower when the object size is small (< 5MiB), and will be around 1.5 - 2x faster for large objects (> 50 MiB). We throw away uploaded data, so memory consumption is also better.
- Add streaming upload interfaces for
Checklist
- [x] I have written necessary rustdoc comments
- [x] I have added necessary unit tests and integration tests
- [x] All checks passed in
./risedev check
(or alias,./risedev c
)
Documentation
If your pull request contains user-facing changes, please specify the types of the changes, and create a release note. Otherwise, please feel free to remove this section.
Types of user-facing changes
Please keep the types that apply to your changes, and remove those that do not apply.
- Installation and deployment
- Connector (sources & sinks)
- SQL commands, functions, and operators
- RisingWave cluster configuration changes
- Other (please specify in the release note below)
Release note
Please create a release note for your changes. In the release note, focus on the impact on users, and mention the environment or conditions where the impact may occur.
Refer to a related PR or issue link (optional)
#1368
Codecov Report
Merging #4454 (530dfd8) into main (14b8a0f) will decrease coverage by
0.19%
. The diff coverage is78.85%
.
@@ Coverage Diff @@
## main #4454 +/- ##
==========================================
- Coverage 74.36% 74.16% -0.20%
==========================================
Files 853 855 +2
Lines 126671 127839 +1168
==========================================
+ Hits 94198 94813 +615
- Misses 32473 33026 +553
Flag | Coverage Δ | |
---|---|---|
rust | 74.16% <78.85%> (-0.20%) |
:arrow_down: |
Flags with carried forward coverage won't be shown. Click here to find out more.
Impacted Files | Coverage Δ | |
---|---|---|
src/common/common_service/src/observer_manager.rs | 0.00% <0.00%> (ø) |
|
src/common/src/monitor/mod.rs | 0.00% <0.00%> (ø) |
|
...c/compute/src/compute_observer/observer_manager.rs | 0.00% <0.00%> (ø) |
|
src/compute/src/lib.rs | 3.03% <0.00%> (-0.10%) |
:arrow_down: |
src/compute/src/server.rs | 0.00% <0.00%> (ø) |
|
src/ctl/src/cmd_impl/hummock/sst_dump.rs | 0.00% <0.00%> (ø) |
|
src/ctl/src/cmd_impl/stream/trace.rs | 0.00% <0.00%> (ø) |
|
src/ctl/src/cmd_impl/table/scan.rs | 0.00% <0.00%> (ø) |
|
src/ctl/src/common/hummock_service.rs | 0.00% <0.00%> (ø) |
|
src/expr/src/expr/expr_binary_nonnull.rs | 84.00% <ø> (ø) |
|
... and 146 more |
:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more
Instead of exposing as multipart upload
, I will prefer to expose it as streaming_upload
with write_bytes
and finish
method. The multipart upload implementation detail will be hidden inside the streaming uploader returned from S3 object store. In case that some object stores do not support multi part upload, they can still implement streaming upload by saving all the written data in a buffer and upload all at once when finish
ed.
Instead of exposing as
multipart upload
, I will prefer to expose it asstreaming_upload
withwrite_bytes
andfinish
method. The multipart upload implementation detail will be hidden inside the streaming uploader returned from S3 object store. In case that some object stores do not support multi part upload, they can still implement streaming upload by saving all the written data in a buffer and upload all at once whenfinish
ed.
I think we should distinguish the multipart upload
in ObjectStore
's interface from S3's multipart upload. In our code, multipart upload simply means that "you can upload an object part by part and it will finally appear to be a single object".(which is pretty interchangeable with the streaming_upload
that you mention, since it might not be really "streaming"). It doesn't bear any indication of how it is implemented, be it with streaming upload, appending to buffer, linking inconsecutive data blocks, etc. Essentially, multipart upload here is a broader concept than streaming upload, and I think it's really just a matter of naming.
I borrowed the part id
concept from S3 so that data don't have to come in order. If we want to support ordered upload, we can define another trait and return a dedicated ordered upload handle.
I understand that this PR's about streaming, so we much emphasize the concept of streaming. But I think the ObjectStore
layer should still use a more general and flexible abstraction, and it would be more appropriate to adpot the concept of "streaming" from SstableStore
and above.
Instead of exposing as
multipart upload
, I will prefer to expose it asstreaming_upload
withwrite_bytes
andfinish
method. The multipart upload implementation detail will be hidden inside the streaming uploader returned from S3 object store. In case that some object stores do not support multi part upload, they can still implement streaming upload by saving all the written data in a buffer and upload all at once whenfinish
ed.I think we should distinguish the
multipart upload
inObjectStore
's interface from S3's multipart upload. In our code, multipart upload simply means that "you can upload an object part by part and it will finally appear to be a single object".(which is pretty interchangeable with thestreaming_upload
that you mention, since it might not be really "streaming"). It doesn't bear any indication of how it is implemented, be it with streaming upload, appending to buffer, linking inconsecutive data blocks, etc. Essentially, multipart upload here is a broader concept than streaming upload, and I think it's really just a matter of naming.I borrowed the
part id
concept from S3 so that data don't have to come in order. If we want to support ordered upload, we can define another trait and return a dedicated ordered upload handle.I understand that this PR's about streaming, so we much emphasize the concept of streaming. But I think the
ObjectStore
layer should still use a more general and flexible abstraction, and it would be more appropriate to adpot the concept of "streaming" fromSstableStore
and above.
I think a streaming upload interface is more generic than the part-based upload interface. The reasons are:
- We will have at least two upload interfaces in ObjectStore: one for part-based upload, one for full object upload. However, we can unify the upload interfaces if ObjectStore provides a streaming upload interface.
- If the underlying object store doesn't support part-based upload, the part-based upload interface cannot be used. That means a layer above ObjectStore (e.g.
SstableStore
) needs to understand the type of ObjectStore and has a switch in its streaming upload implementation to decide which upload interface to use. This leaks the implementation detail of ObjectStore and makes the codes more complex.
It doesn't bear any indication of how it is implemented
Just to be clear, we do not make any attempt to chunk our multipart upload into specific size/chunks, for instance at block boundary, correct? In that case, I agree a streaming upload interface seems more generic.
I think multipart is simply one implementation of that (specific to S3's interface).
And even if we want to align the part size to some chunk size or boundary, the S3 implementation can take care of that (perhaps with additional config).
I think we should distinguish the multipart upload in ObjectStore's interface from S3's multipart upload. In our code, multipart upload simply means that "you can upload an object part by part and it will finally appear to be a single object".(which is pretty interchangeable with the streaming_upload that you mention, since it might not be really "streaming"). It doesn't bear any indication of how it is implemented, be it with streaming upload, appending to buffer, linking inconsecutive data blocks, etc. Essentially, multipart upload here is a broader concept than streaming upload, and I think it's really just a matter of naming.
It's not just a matter of naming. Uploading object part by part
and streaming upload
differs in that uploading object part by part
does not have to ensure that the uploaded part number is sent in order, which means the data is received with no order and we may randomly write any part of a object, while streaming_upload
indicates that the data is received in order and we only need to append new data to the previously uploaded data.
Currently all our related potential use cases are streaming_upload
only, and exposing the more generic uploading object part by part
abstraction for ObjectStore will make upper layer usage complicated. Besides, streaming_upload
will ease the implementation of InMemoryObjectStore
and DiskObjectStore
, since they can just keep appending newly added data. The knowledge of streaming_upload
is more essential for DiskObjectStore
, since for uploading data part by part
, the offset of each part is unknown until we call finish
and sort all parts by their part numbers, and before knowing the offset of each part, we cannot even pre-write the data to disk with pwrite
, which means we have to buffer all the data in memory until we call finish
. Instead, to support streaming_upload
, we can call file.write
whenever we receive new data, and we can save memory usage with this, and this is append-only write and we may have better write performance. So considering all mentioned above, it's better to expose streaming_upload
abstraction in object store and it's unnecessary to expose it as uploading data part by part
.
But we can still expose multi_part_upload
as a special functionality of S3ObjectStore. Since the ObjectStoreImpl
we passed in is an enum, we are able to check whether its implementation is S3ObjectStore, and use such special multi_part upload functionality for S3ObjectStore in some special use case.
Just to be clear, we do not make any attempt to chunk our multipart upload into specific size/chunks, for instance at block boundary, correct? In that case, I agree a streaming upload interface seems more generic.
We do. According to the benchmark, it's better to upload the SSTs in 8M/16M parts and align each part to the beginning of the block. But no matter which interface we provide, this should be handled by upper-level code.
Seems I have put too much weight on generality by supporting out-of-order uploading. I'm switching to streaming upload interface. But how can we unify the full-upload and streaming-upload interfaces, as @hzxa21 mentioned?
I guess out of order would support parallel upload? I guess for now we don't need to support it, as parallelism could be achieved within a task via "splits" already.
I guess out of order would support parallel upload? I guess for now we don't need to support it, as parallelism could be achieved within a task via "splits" already.
What "split"?
What "split"?
It’s just multiple ranges within the same compaction task.
But how can we unify the full-upload and streaming-upload interfaces, as @hzxa21 mentioned?
I believe his point is actually that stream upload is the interface that meets our requirements. Meaning “full-upload” (out of order) is not required.