incubator-uniffle
incubator-uniffle copied to clipboard
[#1717] improvement: Pick partitions instead of shuffles for flushing
What changes were proposed in this pull request?
Pick partitions instead of shuffles for flushing
Why are the changes needed?
For better performance. Fix: #1717
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UTs and manual testing
A simple test by using TeraSort(4T):
The yellow line is the instance without this patch.
Test Results
2 433 files +28 2 433 suites +28 5h 0m 30s :stopwatch: + 2m 12s 934 tests + 3 933 :white_check_mark: + 3 1 :zzz: ±0 0 :x: ±0 10 828 runs +37 10 814 :white_check_mark: +37 14 :zzz: ±0 0 :x: ±0
Results for commit 1feaa8e9. ± Comparison against base commit c3cbdec4.
This pull request removes 1 and adds 4 tests. Note that renamed tests count towards both.
org.apache.uniffle.test.CoordinatorAssignmentTest ‑ testReconfigureNodeMax
org.apache.spark.shuffle.writer.WriteBufferManagerTest ‑ testClearWithSpillRatio
org.apache.uniffle.common.ReconfigurableConfManagerTest ‑ test
org.apache.uniffle.common.ReconfigurableConfManagerTest ‑ testWithoutInitialization
org.apache.uniffle.test.CoordinatorReconfigureNodeMaxTest ‑ testReconfigureNodeMax
:recycle: This comment has been updated with latest results.
If the range of [low-watermark, high-watermark] is large, this problem with this patch should still exist, right?
If the range of [low-watermark, high-watermark] is large, this problem with this patch should still exist, right?
+1. I'm not sure how your strategy to choose partitions over shuffles will improvement performance: reduce gc or disk access.
If the range of [low-watermark, high-watermark] is large, this problem with this patch should still exist, right?
You are right, but if the range of [low-watermark, high-watermark] is not large and the size of a shuffle is large, this patch will work.
+1. I'm not sure how your strategy to choose partitions over shuffles will improvement performance: reduce gc or disk access.
The following configuration is that i used when testing:
rss.server.buffer.capacity=400g
rss.server.memory.shuffle.lowWaterMark.percentage=75
rss.server.memory.shuffle.highWaterMark.percentage=85
My goal is to make full use of the memory, so i hope that it only writes 40g to the disk when the memory reaches 340g. But actually it writes 340g. When a large number of buffers are written to the disk, a large number of objects will be generated, and GC will be very frequent. Moreover, when hundreds of GB of data are written to the disk at the same time, the system load will be very high, further reducing the speed of GC.
@zuston @advancedxy
Codecov Report
Attention: Patch coverage is 96.29630% with 3 lines in your changes are missing coverage. Please review.
Project coverage is 54.11%. Comparing base (
6f6d35a) to head (d0dd4c0). Report is 25 commits behind head on master.
| Files | Patch % | Lines |
|---|---|---|
| ...he/uniffle/server/buffer/ShuffleBufferManager.java | 95.31% | 1 Missing and 2 partials :warning: |
Additional details and impacted files
@@ Coverage Diff @@
## master #1718 +/- ##
============================================
- Coverage 54.86% 54.11% -0.75%
- Complexity 2358 2767 +409
============================================
Files 368 418 +50
Lines 16379 21808 +5429
Branches 1504 2048 +544
============================================
+ Hits 8986 11802 +2816
- Misses 6862 9269 +2407
- Partials 531 737 +206
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
If the range of [low-watermark, high-watermark] is large, this problem with this patch should still exist, right?
You are right, but if the range of [low-watermark, high-watermark] is not large and the size of a shuffle is large, this patch will work.
+1. I'm not sure how your strategy to choose partitions over shuffles will improvement performance: reduce gc or disk access.
The following configuration is that i used when testing:
rss.server.buffer.capacity=400grss.server.memory.shuffle.lowWaterMark.percentage=75rss.server.memory.shuffle.highWaterMark.percentage=85My goal is to make full use of the memory, so i hope that it only writes 40g to the disk when the memory reaches 340g. But actually it writes 340g. When a large number of buffers are written to the disk, a large number of objects will be generated, and GC will be very frequent. Moreover, when hundreds of GB of data are written to the disk at the same time, the system load will be very high, further reducing the speed of GC.
@zuston @advancedxy
Got it. Make sense for your case.
My goal is to make full use of the memory, so i hope that it only writes 40g to the disk when the memory reaches 340g. But actually it writes 340g.
So, the reason one flush writes ~340GB shuffle data is that there's one large shuffle(from one specific app) that occupies ~340GB memory data?
This is a valid use case and we should improve that. However I'm a bit of worried the implications of the new approach:
- it might be quite expensive to maintain the topN buffers as it might be a lot of buffers from all the shuffles in one shuffle server
- unbalance access for partition data in shuffle server memory and on disk/hdfs: parts of data in memory and parts of data flushed for almost every shuffles.
How about we make incremental improvement to this:
- picks up the shuffles to flush
- if the flush size is way larger than (high_watermark - low_watermark) * buffer_capacity, picks the top N buffers in the picked shuffles(which should be one, or two).
My goal is to make full use of the memory, so i hope that it only writes 40g to the disk when the memory reaches 340g. But actually it writes 340g.
So, the reason one flush writes ~340GB shuffle data is that there's one large shuffle(from one specific app) that occupies ~340GB memory data?
This is a valid use case and we should improve that. However I'm a bit of worried the implications of the new approach:
- it might be quite expensive to maintain the topN buffers as it might be a lot of buffers from all the shuffles in one shuffle server
- unbalance access for partition data in shuffle server memory and on disk/hdfs: parts of data in memory and parts of data flushed for almost every shuffles.
How about we make incremental improvement to this:
- picks up the shuffles to flush
- if the flush size is way larger than (high_watermark - low_watermark) * buffer_capacity, picks the top N buffers in the picked shuffles(which should be one, or two).
ok for me.
@xianjingfeng , when this is ready for review, you can ping me or @zuston.
@xianjingfeng , when this is ready for review, you can ping me or @zuston.
It's ready for review. @advancedxy @jerqi @zuston
Do you test this PR and check it works as expected.
I have only tested it with UT.
LGTM. And it would be great if we could test this PR has been worked as expected.
@advancedxy
Merged. Thanks for your review. @zuston @advancedxy
BTW, I'm curious how much can the performance be improved?
If I understand this correctly, is this PR similar to https://github.com/apache/incubator-uniffle/pull/1670 on the client-side, but implemented on the server-side? They all "flush" partial data.
BTW, I'm curious how much can the performance be improved?
It is difficult to estimate. In theory, this pr will reduce the amount of data written to disk.
If I understand this correctly, is this PR similar to #1670 on the client-side, but implemented on the server-side? They all "flush" partial data.
#1670 is for reducing the number of small blocks. This pr is for reducing the amount of data written to disk.
This pr is for reducing the amount of data written to disk.
When the server's memory is insufficient, it always has to flush to the disk. If the server's memory is not increased, whether it is flushing part of the data to the disk with this PR, or the previous flushing of a complete ShuffleBuffer to the disk, at the end of the task, the total amount of data that the server flushes to the disk should be similar, right? Is it possible that if I flush less each time, it will reduce the amount of data written to the disk? To be more extreme, if I have to write 100TB of shuffle data to the shuffle server, and the server only has 300GB of memory, ultimately over 99TB of data has to be written to the disk, regardless of whether this PR is used or not.
So, I don't understand what you mean by saying reducing the amount of data written to disk.
This pr is for reducing the amount of data written to disk.
When the server's memory is insufficient, it always has to flush to the disk. If the server's memory is not increased, whether it is flushing part of the data to the disk with this PR, or the previous flushing of a complete ShuffleBuffer to the disk, at the end of the task, the total amount of data that the server flushes to the disk should be similar, right? Is it possible that if I flush less each time, it will reduce the amount of data written to the disk? To be more extreme, if I have to write 100TB of shuffle data to the shuffle server, and the server only has 300GB of memory, ultimately over 99TB of data has to be written to the disk, regardless of whether this PR is used or not.
So, I don't understand what you mean by saying
reducing the amount of data written to disk.
In our production environment, the maximum disk usage is about 5T per node , but we have 1.4T of memory per node. If a shuffle occupancy a lot of memory, but it will be completed quickly, there is no need to flush all the data to disk in this case.
I got your point, this PR will benefit in your cases. We don't know if it will still benefit in other cases, e.g. the shuffle size way larger than the memory of the server.
I got your point, this PR will benefit in your cases. We don't know if it will still benefit in other cases, e.g. the shuffle size way larger than the memory of the server.
In general, without affecting the flush performance smaller granularity is always better than larger granularity. It allows data to be stored in memory longer and spreads disk IO pressure to relatively idle time segments.
OK, I'll test this in the future.