datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Generate GroupByHash output in multiple RecordBatches

Open JasonLi-cn opened this issue 1 year ago • 18 comments

Which issue does this PR close?

Closes https://github.com/apache/datafusion/issues/9562

If the community thinks this PR is reasonable, I will continue the work:

  • [ ] 1. Eliminate redundant code
  • [ ] 2. Modify the code further as recommended by review
  • [x] 3. Benchmark

In addition, we need to discuss whether we need to emit by batch_size when spill is true. During the cargo test, I found that if emitting by batch_size when spill is true, some test cases such as aggregate_source_not_yielding_with_spill could not pass. Because the number of RecordBatches has increased, resulting in 'BatchBuilder' call 'push_batch' consumes more memory in update_merged_stream.

https://github.com/apache/datafusion/blob/0f554fa12490ffa72c3c6ca42186c1ac9461bfa6/datafusion/physical-plan/src/sorts/builder.rs#L71-L80

Personally, I prefer to emit by batch_size when spill is true. Otherwise, the panic (https://github.com/apache/arrow-rs/issues/6112#issue-2429706033) is easily triggered by spill.

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

JasonLi-cn avatar Aug 01 '24 07:08 JasonLi-cn

Thank you @JasonLi-cn

I wonder if we have tested the performance of this branch? I worry that the incremental output generation will result in a copying the values multiple times (as each emit_to will effectively copy the remaining elements "down"

If this turns out to be a large performance overhead, then I think we could look into updating the accumulators to remember where they have emitted to or something (or maybe add a EmitNext or something that could handle remebering the offset 🤔

alamb avatar Aug 05 '24 20:08 alamb

Thank you @JasonLi-cn

I wonder if we have tested the performance of this branch? I worry that the incremental output generation will result in a copying the values multiple times (as each emit_to will effectively copy the remaining elements "down"

If this turns out to be a large performance overhead, then I think we could look into updating the accumulators to remember where they have emitted to or something (or maybe add a EmitNext or something that could handle remebering the offset 🤔

Thank you @alamb . I'll run the benchmark of aggregate.

JasonLi-cn avatar Aug 06 '24 01:08 JasonLi-cn

Benchmark(main VS this branch)

cargo bench --bench aggregate_query_sql
Gnuplot not found, using plotters backend
aggregate_query_no_group_by 15 12
                        time:   [2.1563 ms 2.1686 ms 2.1824 ms]
                        change: [-0.1065% +1.0107% +2.0667%] (p = 0.07 > 0.05)
                        No change in performance detected.
Found 9 outliers among 100 measurements (9.00%)
  7 (7.00%) high mild
  2 (2.00%) high severe

aggregate_query_no_group_by_min_max_f64
                        time:   [2.0533 ms 2.0655 ms 2.0782 ms]
                        change: [-0.2686% +0.5618% +1.4604%] (p = 0.21 > 0.05)
                        No change in performance detected.
Found 3 outliers among 100 measurements (3.00%)
  3 (3.00%) high mild

aggregate_query_no_group_by_count_distinct_wide
                        time:   [2.8570 ms 2.8720 ms 2.8884 ms]
                        change: [-1.1717% -0.2697% +0.6437%] (p = 0.57 > 0.05)
                        No change in performance detected.
Found 11 outliers among 100 measurements (11.00%)
  6 (6.00%) high mild
  5 (5.00%) high severe

aggregate_query_no_group_by_count_distinct_narrow
                        time:   [2.6355 ms 2.6427 ms 2.6502 ms]
                        change: [-0.3158% +0.1024% +0.5289%] (p = 0.64 > 0.05)
                        No change in performance detected.
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) low mild
  3 (3.00%) high mild

aggregate_query_group_by
                        time:   [3.1874 ms 3.2218 ms 3.2633 ms]
                        change: [+0.8142% +2.0424% +3.3431%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 10 outliers among 100 measurements (10.00%)
  1 (1.00%) low mild
  2 (2.00%) high mild
  7 (7.00%) high severe

aggregate_query_group_by_with_filter
                        time:   [3.2268 ms 3.2375 ms 3.2499 ms]
                        change: [-0.8765% -0.3444% +0.1488%] (p = 0.19 > 0.05)
                        No change in performance detected.
Found 5 outliers among 100 measurements (5.00%)
  3 (3.00%) high mild
  2 (2.00%) high severe

aggregate_query_group_by_u64 15 12
                        time:   [2.9613 ms 2.9724 ms 2.9841 ms]
                        change: [-1.6244% -1.0331% -0.4294%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild

aggregate_query_group_by_with_filter_u64 15 12
                        time:   [3.1977 ms 3.2092 ms 3.2215 ms]
                        change: [-0.5511% -0.0732% +0.4691%] (p = 0.77 > 0.05)
                        No change in performance detected.
Found 10 outliers among 100 measurements (10.00%)
  1 (1.00%) low mild
  5 (5.00%) high mild
  4 (4.00%) high severe

aggregate_query_group_by_u64_multiple_keys
                        time:   [6.4762 ms 6.5397 ms 6.6054 ms]
                        change: [+0.9528% +2.4183% +3.9063%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

aggregate_query_approx_percentile_cont_on_u64
                        time:   [6.2755 ms 6.3326 ms 6.3915 ms]
                        change: [-4.7095% -2.6423% -0.8346%] (p = 0.01 < 0.05)
                        Change within noise threshold.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

aggregate_query_approx_percentile_cont_on_f32
                        time:   [5.4463 ms 5.4856 ms 5.5275 ms]
                        change: [-1.1171% -0.1131% +0.9433%] (p = 0.83 > 0.05)
                        No change in performance detected.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild

aggregate_query_distinct_median
                        time:   [4.1459 ms 4.1710 ms 4.1962 ms]
                        change: [-1.0176% -0.1285% +0.7244%] (p = 0.78 > 0.05)
                        No change in performance detected.

The performance of aggregate_query_group_by and aggregate_query_group_by_u64_multiple_keys deteriorates slightly.What they all have in common is the use of GroupValuesRows. So I guess the reason for the performance deterioration is caused by GroupValuesRows(I think calling let mut output = self.row_converter.convert_rows(groups_rows)?; multiple times will cause some overhead because it will result in more memory allocation times). In addition, because the other benchmarks haven't changed much, I think the performance overhead caused by accumulators is small.

# aggregate_query_group_by
SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) FROM t GROUP BY utf8

# aggregate_query_group_by_u64_multiple_keys
SELECT u64_wide, utf8, MIN(f64), AVG(f64), COUNT(f64) FROM t GROUP BY u64_wide, utf8

This test may be incomplete, do you @alamb have any better test suggestions? 🤔

JasonLi-cn avatar Aug 06 '24 03:08 JasonLi-cn

This test may be incomplete, do you @alamb have any better test suggestions? 🤔

Hi @JasonLi-cn -- yes I think we should run the ClickBench and TPCH benchmarks using the script here https://github.com/apache/datafusion/tree/main/benchmarks

I am doing so now and will report results here

alamb avatar Aug 08 '24 11:08 alamb

I hit a bug https://github.com/apache/datafusion/pull/11833 that has been fixed on main when trying to run the benchmarks on this branch:

Query 19 avg time: 161.22 ms
Error: External(ArrowError(InvalidArgumentError("column types must match schema types, expected Decimal128(25, 2) but found Decimal128(38, 10) at column index 2"), None))

Thus I a going to merge up from main and try again

alamb avatar Aug 08 '24 14:08 alamb

🤔

Query 17 iteration 4 took 6144.7 ms and returned 10 rows
Q18: SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT\
 10;
./bench.sh: line 410: 1005229 Killed                  $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries\
/clickbench/queries.sql" -o ${RESULTS_FILE}
alamb@aal-dev:~/datafusion-benchmarking$

It appears on this branch the kernel killed the process due to out of memory (which does not happen on main)

alamb@aal-dev:~/datafusion-benchmarking$sudo dmesg | tail
[262830.944639] [1003196]     0 1003196     4230     1440    69632        0             0 sshd
[262830.944642] [1003273]  1001 1003273     4305     1341    69632        0             0 sshd
[262830.944644] [1003274]  1001 1003274     2308     1152    53248        0             0 bash
[262830.944646] [1003284]  1001 1003284     2080      864    57344        0             0 tmux: client
[262830.944648] [1003287]  1001 1003287     2765     1230    61440        0             0 bash
[262830.944650] [1005223]  1001 1005223     2007      768    57344        0             0 bash
[262830.944652] [1005229]  1001 1005229 25851352 16101822 170872832        0             0 dfbench
[262830.944654] oom-kill:constraint=CONSTRAINT_NONE,nodemask=(null),cpuset=user.slice,mems_allowed=0,global_oom,task_memcg=/user.slice/user-1001.slice/session-14.scope,task=dfbench,pi\
d=1005229,uid=1001
[262830.944733] Out of memory: Killed process 1005229 (dfbench) total-vm:103405408kB, anon-rss:64403832kB, file-rss:3456kB, shmem-rss:0kB, UID:1001 pgtables:166868kB oom_score_adj:0
[262833.485378] oom_reaper: reaped process 1005229 (dfbench), now anon-rss:0kB, file-rss:1140kB, shmem-rss:0kB
alamb@aal-dev:~/datafusion-benchmarking$

alamb avatar Aug 08 '24 15:08 alamb

405

🤔

Query 17 iteration 4 took 6144.7 ms and returned 10 rows
Q18: SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT\
 10;
./bench.sh: line 410: 1005229 Killed                  $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries\
/clickbench/queries.sql" -o ${RESULTS_FILE}
alamb@aal-dev:~/datafusion-benchmarking$

It appears on this branch the kernel killed the process due to out of memory (which does not happen on main)

alamb@aal-dev:~/datafusion-benchmarking$sudo dmesg | tail
[262830.944639] [1003196]     0 1003196     4230     1440    69632        0             0 sshd
[262830.944642] [1003273]  1001 1003273     4305     1341    69632        0             0 sshd
[262830.944644] [1003274]  1001 1003274     2308     1152    53248        0             0 bash
[262830.944646] [1003284]  1001 1003284     2080      864    57344        0             0 tmux: client
[262830.944648] [1003287]  1001 1003287     2765     1230    61440        0             0 bash
[262830.944650] [1005223]  1001 1005223     2007      768    57344        0             0 bash
[262830.944652] [1005229]  1001 1005229 25851352 16101822 170872832        0             0 dfbench
[262830.944654] oom-kill:constraint=CONSTRAINT_NONE,nodemask=(null),cpuset=user.slice,mems_allowed=0,global_oom,task_memcg=/user.slice/user-1001.slice/session-14.scope,task=dfbench,pi\
d=1005229,uid=1001
[262830.944733] Out of memory: Killed process 1005229 (dfbench) total-vm:103405408kB, anon-rss:64403832kB, file-rss:3456kB, shmem-rss:0kB, UID:1001 pgtables:166868kB oom_score_adj:0
[262833.485378] oom_reaper: reaped process 1005229 (dfbench), now anon-rss:0kB, file-rss:1140kB, shmem-rss:0kB
alamb@aal-dev:~/datafusion-benchmarking$

Thank you @alamb 🙏. Let me analyze it further 🤔

JasonLi-cn avatar Aug 09 '24 01:08 JasonLi-cn

Thank you @alamb 🙏. Let me analyze it further 🤔

In order to actually generate the output in multiple batches and gain performance, I think we would need to change:

  1. The GroupValues storage (so that it never creates a large contiguous range)
  2. The GroupsAccumulators likewise to manage the internal state as multiple chunks and not as single chunks

This would likely require some sort of API change to the accumulators / etc

I wonder if we could find some way to do the implementation incrementally

alamb avatar Aug 10 '24 13:08 alamb

Thank you @alamb 🙏. Let me analyze it further 🤔

In order to actually generate the output in multiple batches and gain performance, I think we would need to change:

1. The `GroupValues` storage (so that it never creates a large contiguous range)

2. The `GroupsAccumulators` likewise to manage the internal state as multiple chunks and not as single chunks

This would likely require some sort of API change to the accumulators / etc

I wonder if we could find some way to do the implementation incrementally

I agree, finally it should be a big change which switches the group values and related states managed by block like duckdb , and I am working on this(#11931).

But maybe just splitting the emit result still have benefits? Seems that it can avoid calling the slice function many times which really costs cpu?

Rachelint avatar Aug 10 '24 21:08 Rachelint

I agree, finally it should be a big change which switches the group values and related states managed by block like duckdb , and I am working on this(https://github.com/apache/datafusion/issues/11931).

I think personally suggest sketching out what this would look like in a first PR without worrying about getting all the tests passing / compiling etc.

If we try to port all code at once to being managed in blocks it is going to be a very large change

I am thinking maybe we can have a incremental approach (like for example separately adding the ability to do blocked emission for https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.GroupsAccumulator.html and https://github.com/apache/datafusion/blob/fd237f8705b18fa089fdfb8dd5b04655ccb4d691/datafusion/physical-plan/src/aggregates/group_values/mod.rs#L37-L55

If we can set this up so that we get the pattern and a few common implementations setup in the first PR then we can make subsequent PRs to port over the other parts of the aggregation 🤔

alamb avatar Aug 11 '24 15:08 alamb

I agree, finally it should be a big change which switches the group values and related states managed by block like duckdb , and I am working on this(#11931).

I think personally suggest sketching out what this would look like in a first PR without worrying about getting all the tests passing / compiling etc.

If we try to port all code at once to being managed in blocks it is going to be a very large change

I am thinking maybe we can have a incremental approach (like for example separately adding the ability to do blocked emission for https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.GroupsAccumulator.html and

https://github.com/apache/datafusion/blob/fd237f8705b18fa089fdfb8dd5b04655ccb4d691/datafusion/physical-plan/src/aggregates/group_values/mod.rs#L37-L55

If we can set this up so that we get the pattern and a few common implementations setup in the first PR then we can make subsequent PRs to port over the other parts of the aggregation 🤔

Yes, I want to do the similar things for

incremental approach

Yes... I try to do something like it but still not thorough enough, and I found it is actually hard to support the exact EmitTo::First used by streaming aggr after I found the reason why some tests failed... And we actually should disable the blocked mode in streaming aggr.

I am planning to switch to the special blocked emission impl now... It seems we can do two things to introduce the new emit modes?

  • Define a support_blocked_emission for GroupValues and GroupAccumulator.
  • Add two emit enums
pub enum EmitTo {
    /// Emit all groups
    All,
    /// Emit only the first `n` groups and shift all existing group
    /// indexes down by `n`.
    ///
    /// For example, if `n=10`, group_index `0, 1, ... 9` are emitted
    /// and group indexes '`10, 11, 12, ...` become `0, 1, 2, ...`.
    First(usize),
    
    AllBlocks,
    
    FirstBlocks(usize),
}

What do you think this way to supprot the special blocked emission?

Rachelint avatar Aug 11 '24 17:08 Rachelint

I agree, finally it should be a big change which switches the group values and related states managed by block like duckdb , and I am working on this(#11931).

I think personally suggest sketching out what this would look like in a first PR without worrying about getting all the tests passing / compiling etc. If we try to port all code at once to being managed in blocks it is going to be a very large change I am thinking maybe we can have a incremental approach (like for example separately adding the ability to do blocked emission for https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.GroupsAccumulator.html and https://github.com/apache/datafusion/blob/fd237f8705b18fa089fdfb8dd5b04655ccb4d691/datafusion/physical-plan/src/aggregates/group_values/mod.rs#L37-L55

If we can set this up so that we get the pattern and a few common implementations setup in the first PR then we can make subsequent PRs to port over the other parts of the aggregation 🤔

Yes, I want to do the similar things for

incremental approach

Yes... I try to do something like it but still not thorough enough, and I found it is actually hard to support the exact EmitTo::First used by streaming aggr after I found the reason why some tests failed... And we actually should disable the blocked mode in streaming aggr.

I am planning to switch to the special blocked emission impl now... It seems we can do two things to introduce the new emit modes?

  • Define a support_blocked_emission for GroupValues and GroupAccumulator.
  • Add two emit enums
pub enum EmitTo {
    /// Emit all groups
    All,
    /// Emit only the first `n` groups and shift all existing group
    /// indexes down by `n`.
    ///
    /// For example, if `n=10`, group_index `0, 1, ... 9` are emitted
    /// and group indexes '`10, 11, 12, ...` become `0, 1, 2, ...`.
    First(usize),
    
    AllBlocks,
    
    FirstBlocks(usize),
}

What do you think this way to supprot the special blocked emission?

Do we need to put batch_size in AllBlocks and FirstBlocks 🤔 ? And that's exactly what I was planning on doing. But I found that the changes to the code were very large.

JasonLi-cn avatar Aug 12 '24 07:08 JasonLi-cn

@JasonLi-cn As I think, GroupValues impls maybe should not care about the batch size? And we just do the split and merge work in the GroupedHashAggregateStream::poll , if unfortunately, the batch size != block size (usually they will equal)?

Maybe we should impl the special block based GroupValues impls like following?

  • We pass the block size when initializing it
  • It manage the inner values block by block
  • It return all blocks with internal block size We can always make the block size == batch size, so we can totally avoid any split operators.

I am making a try about it in #11943 , and have done some related code changes.

Rachelint avatar Aug 12 '24 09:08 Rachelint

@JasonLi-cn As I think, GroupValues impls maybe should not care about the batch size? And we just do the split and merge work in the GroupedHashAggregateStream::poll , if unfortunately, the batch size != block size (usually they will equal)?

Maybe we should impl the special block based GroupValues impls like following?

  • We pass the block size when initializing it
  • It manage the inner values block by block
  • It return all blocks with internal block size We can always make the block size == batch size, so we can totally avoid any split operators.

I am making a try about it in #11943 , and have done some related code changes.

OK. How do we determine the value of block size?

JasonLi-cn avatar Aug 12 '24 10:08 JasonLi-cn

@JasonLi-cn As I think, GroupValues impls maybe should not care about the batch size? And we just do the split and merge work in the GroupedHashAggregateStream::poll , if unfortunately, the batch size != block size (usually they will equal)? Maybe we should impl the special block based GroupValues impls like following?

  • We pass the block size when initializing it
  • It manage the inner values block by block
  • It return all blocks with internal block size We can always make the block size == batch size, so we can totally avoid any split operators.

I am making a try about it in #11943 , and have done some related code changes.

OK. How do we determine the value of block size?

@JasonLi-cn As I think, GroupValues impls maybe should not care about the batch size? And we just do the split and merge work in the GroupedHashAggregateStream::poll , if unfortunately, the batch size != block size (usually they will equal)? Maybe we should impl the special block based GroupValues impls like following?

  • We pass the block size when initializing it
  • It manage the inner values block by block
  • It return all blocks with internal block size We can always make the block size == batch size, so we can totally avoid any split operators.

I am making a try about it in #11943 , and have done some related code changes.

OK. How do we determine the value of block size?

I think maybe we make it equal to batch_size in most cases, and so that we can avoid any split operations during producing output? And for the cornercase, for example, the batch_size is too small, we can let it fallback to single block mode?

Rachelint avatar Aug 12 '24 10:08 Rachelint

I think maybe we make it equal to batch_size in most cases, and so that we can avoid any split operations during producing output? And for the cornercase, for example, the batch_size is too small, we can let it fallback to single block mode?

Yes I think this is a good strategy

alamb avatar Aug 14 '24 19:08 alamb

Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.

github-actions[bot] avatar Oct 14 '24 02:10 github-actions[bot]

I believe the plan here is that we will work to improve the coverage of aggregates and then revisit / revive this design

alamb avatar Oct 14 '24 17:10 alamb

Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.

github-actions[bot] avatar Dec 14 '24 02:12 github-actions[bot]