feat: make morsel size configurable to prevent it from being too big or small
Changes Made
Related Issues
Checklist
- [ ] Documented in API Docs (if applicable)
- [ ] Documented in User Guide (if applicable)
- [ ] If adding a new documentation page, doc is added to
docs/mkdocs.ymlnavigation - [ ] Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)
Codecov Report
:x: Patch coverage is 84.65608% with 29 lines in your changes missing coverage. Please review.
:white_check_mark: Project coverage is 74.62%. Comparing base (10086b2) to head (a043353).
:warning: Report is 1 commits behind head on main.
Additional details and impacted files
@@ Coverage Diff @@
## main #5250 +/- ##
==========================================
+ Coverage 74.24% 74.62% +0.38%
==========================================
Files 973 973
Lines 125213 124658 -555
==========================================
+ Hits 92959 93030 +71
+ Misses 32254 31628 -626
| Files with missing lines | Coverage Δ | |
|---|---|---|
| daft/context.py | 88.88% <ø> (ø) |
|
| src/daft-local-execution/src/dispatcher.rs | 93.90% <100.00%> (-2.90%) |
:arrow_down: |
| ...-execution/src/intermediate_ops/intermediate_op.rs | 91.41% <100.00%> (+0.22%) |
:arrow_up: |
| ...rc/daft-local-execution/src/sinks/blocking_sink.rs | 90.05% <100.00%> (+0.21%) |
:arrow_up: |
| src/daft-local-execution/src/sinks/write.rs | 90.69% <100.00%> (+0.07%) |
:arrow_up: |
| ...rc/daft-local-execution/src/streaming_sink/base.rs | 79.80% <100.00%> (+0.39%) |
:arrow_up: |
| src/common/daft-config/src/lib.rs | 69.91% <50.00%> (-2.98%) |
:arrow_down: |
| src/common/daft-config/src/python.rs | 67.35% <56.52%> (-0.93%) |
:arrow_down: |
| src/daft-local-execution/src/pipeline.rs | 81.13% <90.98%> (+1.18%) |
:arrow_up: |
:rocket: New features to boost your workflow:
- :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
- :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.
@colin-ho When you have time, please help me review the changes in this part. The main issue is that the current sink operator cannot control the size of the written data, so here we first implement a workaround. Let's make the upper and lower limits of morse configurable.
Have you tried using into_batches?
Have you tried using
into_batches?
@colin-ho Do you mean that repartition(1) becomes into_batches?
I mean using into_batches to control the batch size. Since the main issue is that the current sink operator cannot control the size of the written data, perhaps using into_batches can solve the problem
It does work. into_batches indeed makes the data in the partition larger. I want to confirm whether shuffle will be performed here, or if it's just a local exchange?
into_batches does not shuffle
@colin-ho Thank you very much. Then, can the logic for adding configuration items here also exist? I originally wanted to uniformly set this configuration item for pipeline.rs. If there are requirements, I would prioritize the requirements. However, I found that it would affect the sorting of some window functions, so in the current PR, only the morsel_size in write.rs has been modified.
The way the current morsel sizing works right now is that there is a default morsel size range of (0, ~128k rows]. This requirement is propagated top down, until an operator with a required morsel size is reached, e.g. UDF with batch size, then this new batch size becomes the new morsel size range.
See https://github.com/Eventual-Inc/Daft/pull/4894 for more details.
The benefit of this is memory, if the UDF requires batch size of 100, then the upstream scan does not need to scan more than 100 rows at a time.
For this PR, lets keep the new configs for min / max morsel size that you have already added, and also add a deprecation warning for the existing default_morsel_size config, in favor of the new min / max configs. However, we should not change morsel size directly in write.rs or any other operator, the propagate_morsel_size_requirement should take care of this.
The way the current morsel sizing works right now is that there is a default morsel size range of (0, ~128k rows]. This requirement is propagated top down, until an operator with a required morsel size is reached, e.g. UDF with batch size, then this new batch size becomes the new morsel size range.
See #4894 for more details.
The benefit of this is memory, if the UDF requires batch size of 100, then the upstream scan does not need to scan more than 100 rows at a time.
For this PR, lets keep the new configs for min / max morsel size that you have already added, and also add a deprecation warning for the existing
default_morsel_sizeconfig, in favor of the new min / max configs. However, we should not change morsel size directly in write.rs or any other operator, thepropagate_morsel_size_requirementshould take care of this.
I have made modifications according to this, but I have submitted some questions to the corresponding code section. We can discuss them together.
@colin-ho I think it's useful to reveal this.
- For example, in the code, there are two udfs: udf1 has a batch_size=15, and udf2 also has a batch_size=15. At this time, it is actually expected that the results from udf1 can be quickly passed to udf2.
- However, it seems that they will get stuck in the intermediate queue. I understand that this is the control of the morsel size, right?
df = daft.from_pylist() df = df.with_column("col1", udf1('data')) df = df.with_column("col1", udf1('data'))
it is actually expected that the results from udf1 can be quickly passed to udf2.
I understand, however the way it works right now is that each udf is always separated into it's own operator, and the operators are separated by the intermediate queue.
You can see this if you do .explain(True), the physical plan will show the operators and their respective batch size.
In the future we can implement operator fusion, where we can combine udfs / projections / filters etc. into their own operator
Hey @Jay-ju , are you able to make progress on this PR?