Daft icon indicating copy to clipboard operation
Daft copied to clipboard

feat: make morsel size configurable to prevent it from being too big or small

Open Jay-ju opened this issue 3 months ago • 12 comments

Changes Made

Related Issues

Checklist

  • [ ] Documented in API Docs (if applicable)
  • [ ] Documented in User Guide (if applicable)
  • [ ] If adding a new documentation page, doc is added to docs/mkdocs.yml navigation
  • [ ] Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)

Jay-ju avatar Sep 21 '25 05:09 Jay-ju

Codecov Report

:x: Patch coverage is 84.65608% with 29 lines in your changes missing coverage. Please review. :white_check_mark: Project coverage is 74.62%. Comparing base (10086b2) to head (a043353). :warning: Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-local-execution/src/pipeline.rs 90.98% 11 Missing :warning:
src/common/daft-config/src/python.rs 56.52% 10 Missing :warning:
src/common/daft-config/src/lib.rs 50.00% 8 Missing :warning:
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #5250      +/-   ##
==========================================
+ Coverage   74.24%   74.62%   +0.38%     
==========================================
  Files         973      973              
  Lines      125213   124658     -555     
==========================================
+ Hits        92959    93030      +71     
+ Misses      32254    31628     -626     
Files with missing lines Coverage Δ
daft/context.py 88.88% <ø> (ø)
src/daft-local-execution/src/dispatcher.rs 93.90% <100.00%> (-2.90%) :arrow_down:
...-execution/src/intermediate_ops/intermediate_op.rs 91.41% <100.00%> (+0.22%) :arrow_up:
...rc/daft-local-execution/src/sinks/blocking_sink.rs 90.05% <100.00%> (+0.21%) :arrow_up:
src/daft-local-execution/src/sinks/write.rs 90.69% <100.00%> (+0.07%) :arrow_up:
...rc/daft-local-execution/src/streaming_sink/base.rs 79.80% <100.00%> (+0.39%) :arrow_up:
src/common/daft-config/src/lib.rs 69.91% <50.00%> (-2.98%) :arrow_down:
src/common/daft-config/src/python.rs 67.35% <56.52%> (-0.93%) :arrow_down:
src/daft-local-execution/src/pipeline.rs 81.13% <90.98%> (+1.18%) :arrow_up:

... and 16 files with indirect coverage changes

:rocket: New features to boost your workflow:
  • :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

codecov[bot] avatar Sep 22 '25 13:09 codecov[bot]

@colin-ho When you have time, please help me review the changes in this part. The main issue is that the current sink operator cannot control the size of the written data, so here we first implement a workaround. Let's make the upper and lower limits of morse configurable.

Jay-ju avatar Sep 22 '25 14:09 Jay-ju

Have you tried using into_batches?

colin-ho avatar Sep 22 '25 17:09 colin-ho

Have you tried using into_batches?

@colin-ho Do you mean that repartition(1) becomes into_batches?

Jay-ju avatar Sep 23 '25 01:09 Jay-ju

I mean using into_batches to control the batch size. Since the main issue is that the current sink operator cannot control the size of the written data, perhaps using into_batches can solve the problem

colin-ho avatar Sep 23 '25 01:09 colin-ho

It does work. into_batches indeed makes the data in the partition larger. I want to confirm whether shuffle will be performed here, or if it's just a local exchange?

Jay-ju avatar Sep 23 '25 01:09 Jay-ju

into_batches does not shuffle

colin-ho avatar Sep 23 '25 01:09 colin-ho

@colin-ho Thank you very much. Then, can the logic for adding configuration items here also exist? I originally wanted to uniformly set this configuration item for pipeline.rs. If there are requirements, I would prioritize the requirements. However, I found that it would affect the sorting of some window functions, so in the current PR, only the morsel_size in write.rs has been modified.

Jay-ju avatar Sep 23 '25 01:09 Jay-ju

The way the current morsel sizing works right now is that there is a default morsel size range of (0, ~128k rows]. This requirement is propagated top down, until an operator with a required morsel size is reached, e.g. UDF with batch size, then this new batch size becomes the new morsel size range.

See https://github.com/Eventual-Inc/Daft/pull/4894 for more details.

The benefit of this is memory, if the UDF requires batch size of 100, then the upstream scan does not need to scan more than 100 rows at a time.

For this PR, lets keep the new configs for min / max morsel size that you have already added, and also add a deprecation warning for the existing default_morsel_size config, in favor of the new min / max configs. However, we should not change morsel size directly in write.rs or any other operator, the propagate_morsel_size_requirement should take care of this.

colin-ho avatar Sep 23 '25 16:09 colin-ho

The way the current morsel sizing works right now is that there is a default morsel size range of (0, ~128k rows]. This requirement is propagated top down, until an operator with a required morsel size is reached, e.g. UDF with batch size, then this new batch size becomes the new morsel size range.

See #4894 for more details.

The benefit of this is memory, if the UDF requires batch size of 100, then the upstream scan does not need to scan more than 100 rows at a time.

For this PR, lets keep the new configs for min / max morsel size that you have already added, and also add a deprecation warning for the existing default_morsel_size config, in favor of the new min / max configs. However, we should not change morsel size directly in write.rs or any other operator, the propagate_morsel_size_requirement should take care of this.

I have made modifications according to this, but I have submitted some questions to the corresponding code section. We can discuss them together.

Jay-ju avatar Oct 11 '25 13:10 Jay-ju

@colin-ho I think it's useful to reveal this.

  • For example, in the code, there are two udfs: udf1 has a batch_size=15, and udf2 also has a batch_size=15. At this time, it is actually expected that the results from udf1 can be quickly passed to udf2.
  • However, it seems that they will get stuck in the intermediate queue. I understand that this is the control of the morsel size, right?

df = daft.from_pylist() df = df.with_column("col1", udf1('data')) df = df.with_column("col1", udf1('data'))

Jay-ju avatar Oct 27 '25 07:10 Jay-ju

it is actually expected that the results from udf1 can be quickly passed to udf2.

I understand, however the way it works right now is that each udf is always separated into it's own operator, and the operators are separated by the intermediate queue.

You can see this if you do .explain(True), the physical plan will show the operators and their respective batch size.

In the future we can implement operator fusion, where we can combine udfs / projections / filters etc. into their own operator

colin-ho avatar Oct 28 '25 04:10 colin-ho

Hey @Jay-ju , are you able to make progress on this PR?

colin-ho avatar Dec 16 '25 21:12 colin-ho