datafusion
datafusion copied to clipboard
Generate GroupByHash output in multiple RecordBatches
Which issue does this PR close?
Closes https://github.com/apache/datafusion/issues/9562
If the community thinks this PR is reasonable, I will continue the work:
- [ ] 1. Eliminate redundant code
- [ ] 2. Modify the code further as recommended by review
- [x] 3. Benchmark
In addition, we need to discuss whether we need to emit by batch_size when spill is true.
During the cargo test, I found that if emitting by batch_size when spill is true, some test cases such as aggregate_source_not_yielding_with_spill could not pass. Because the number of RecordBatches has increased, resulting in 'BatchBuilder' call 'push_batch' consumes more memory in update_merged_stream.
https://github.com/apache/datafusion/blob/0f554fa12490ffa72c3c6ca42186c1ac9461bfa6/datafusion/physical-plan/src/sorts/builder.rs#L71-L80
Personally, I prefer to emit by batch_size when spill is true. Otherwise, the panic (https://github.com/apache/arrow-rs/issues/6112#issue-2429706033) is easily triggered by spill.
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?
Thank you @JasonLi-cn
I wonder if we have tested the performance of this branch? I worry that the incremental output generation will result in a copying the values multiple times (as each emit_to will effectively copy the remaining elements "down"
If this turns out to be a large performance overhead, then I think we could look into updating the accumulators to remember where they have emitted to or something (or maybe add a EmitNext or something that could handle remebering the offset 🤔
Thank you @JasonLi-cn
I wonder if we have tested the performance of this branch? I worry that the incremental output generation will result in a copying the values multiple times (as each
emit_towill effectively copy the remaining elements "down"If this turns out to be a large performance overhead, then I think we could look into updating the accumulators to remember where they have emitted to or something (or maybe add a
EmitNextor something that could handle remebering the offset 🤔
Thank you @alamb . I'll run the benchmark of aggregate.
Benchmark(main VS this branch)
cargo bench --bench aggregate_query_sql
Gnuplot not found, using plotters backend
aggregate_query_no_group_by 15 12
time: [2.1563 ms 2.1686 ms 2.1824 ms]
change: [-0.1065% +1.0107% +2.0667%] (p = 0.07 > 0.05)
No change in performance detected.
Found 9 outliers among 100 measurements (9.00%)
7 (7.00%) high mild
2 (2.00%) high severe
aggregate_query_no_group_by_min_max_f64
time: [2.0533 ms 2.0655 ms 2.0782 ms]
change: [-0.2686% +0.5618% +1.4604%] (p = 0.21 > 0.05)
No change in performance detected.
Found 3 outliers among 100 measurements (3.00%)
3 (3.00%) high mild
aggregate_query_no_group_by_count_distinct_wide
time: [2.8570 ms 2.8720 ms 2.8884 ms]
change: [-1.1717% -0.2697% +0.6437%] (p = 0.57 > 0.05)
No change in performance detected.
Found 11 outliers among 100 measurements (11.00%)
6 (6.00%) high mild
5 (5.00%) high severe
aggregate_query_no_group_by_count_distinct_narrow
time: [2.6355 ms 2.6427 ms 2.6502 ms]
change: [-0.3158% +0.1024% +0.5289%] (p = 0.64 > 0.05)
No change in performance detected.
Found 4 outliers among 100 measurements (4.00%)
1 (1.00%) low mild
3 (3.00%) high mild
aggregate_query_group_by
time: [3.1874 ms 3.2218 ms 3.2633 ms]
change: [+0.8142% +2.0424% +3.3431%] (p = 0.00 < 0.05)
Change within noise threshold.
Found 10 outliers among 100 measurements (10.00%)
1 (1.00%) low mild
2 (2.00%) high mild
7 (7.00%) high severe
aggregate_query_group_by_with_filter
time: [3.2268 ms 3.2375 ms 3.2499 ms]
change: [-0.8765% -0.3444% +0.1488%] (p = 0.19 > 0.05)
No change in performance detected.
Found 5 outliers among 100 measurements (5.00%)
3 (3.00%) high mild
2 (2.00%) high severe
aggregate_query_group_by_u64 15 12
time: [2.9613 ms 2.9724 ms 2.9841 ms]
change: [-1.6244% -1.0331% -0.4294%] (p = 0.00 < 0.05)
Change within noise threshold.
Found 2 outliers among 100 measurements (2.00%)
2 (2.00%) high mild
aggregate_query_group_by_with_filter_u64 15 12
time: [3.1977 ms 3.2092 ms 3.2215 ms]
change: [-0.5511% -0.0732% +0.4691%] (p = 0.77 > 0.05)
No change in performance detected.
Found 10 outliers among 100 measurements (10.00%)
1 (1.00%) low mild
5 (5.00%) high mild
4 (4.00%) high severe
aggregate_query_group_by_u64_multiple_keys
time: [6.4762 ms 6.5397 ms 6.6054 ms]
change: [+0.9528% +2.4183% +3.9063%] (p = 0.00 < 0.05)
Change within noise threshold.
Found 1 outliers among 100 measurements (1.00%)
1 (1.00%) high mild
aggregate_query_approx_percentile_cont_on_u64
time: [6.2755 ms 6.3326 ms 6.3915 ms]
change: [-4.7095% -2.6423% -0.8346%] (p = 0.01 < 0.05)
Change within noise threshold.
Found 1 outliers among 100 measurements (1.00%)
1 (1.00%) high mild
aggregate_query_approx_percentile_cont_on_f32
time: [5.4463 ms 5.4856 ms 5.5275 ms]
change: [-1.1171% -0.1131% +0.9433%] (p = 0.83 > 0.05)
No change in performance detected.
Found 2 outliers among 100 measurements (2.00%)
2 (2.00%) high mild
aggregate_query_distinct_median
time: [4.1459 ms 4.1710 ms 4.1962 ms]
change: [-1.0176% -0.1285% +0.7244%] (p = 0.78 > 0.05)
No change in performance detected.
The performance of aggregate_query_group_by and aggregate_query_group_by_u64_multiple_keys deteriorates slightly.What they all have in common is the use of GroupValuesRows. So I guess the reason for the performance deterioration is caused by GroupValuesRows(I think calling let mut output = self.row_converter.convert_rows(groups_rows)?; multiple times will cause some overhead because it will result in more memory allocation times).
In addition, because the other benchmarks haven't changed much, I think the performance overhead caused by accumulators is small.
# aggregate_query_group_by
SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) FROM t GROUP BY utf8
# aggregate_query_group_by_u64_multiple_keys
SELECT u64_wide, utf8, MIN(f64), AVG(f64), COUNT(f64) FROM t GROUP BY u64_wide, utf8
This test may be incomplete, do you @alamb have any better test suggestions? 🤔
This test may be incomplete, do you @alamb have any better test suggestions? 🤔
Hi @JasonLi-cn -- yes I think we should run the ClickBench and TPCH benchmarks using the script here https://github.com/apache/datafusion/tree/main/benchmarks
I am doing so now and will report results here
I hit a bug https://github.com/apache/datafusion/pull/11833 that has been fixed on main when trying to run the benchmarks on this branch:
Query 19 avg time: 161.22 ms
Error: External(ArrowError(InvalidArgumentError("column types must match schema types, expected Decimal128(25, 2) but found Decimal128(38, 10) at column index 2"), None))
Thus I a going to merge up from main and try again
🤔
Query 17 iteration 4 took 6144.7 ms and returned 10 rows
Q18: SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT\
10;
./bench.sh: line 410: 1005229 Killed $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries\
/clickbench/queries.sql" -o ${RESULTS_FILE}
alamb@aal-dev:~/datafusion-benchmarking$
It appears on this branch the kernel killed the process due to out of memory (which does not happen on main)
alamb@aal-dev:~/datafusion-benchmarking$sudo dmesg | tail
[262830.944639] [1003196] 0 1003196 4230 1440 69632 0 0 sshd
[262830.944642] [1003273] 1001 1003273 4305 1341 69632 0 0 sshd
[262830.944644] [1003274] 1001 1003274 2308 1152 53248 0 0 bash
[262830.944646] [1003284] 1001 1003284 2080 864 57344 0 0 tmux: client
[262830.944648] [1003287] 1001 1003287 2765 1230 61440 0 0 bash
[262830.944650] [1005223] 1001 1005223 2007 768 57344 0 0 bash
[262830.944652] [1005229] 1001 1005229 25851352 16101822 170872832 0 0 dfbench
[262830.944654] oom-kill:constraint=CONSTRAINT_NONE,nodemask=(null),cpuset=user.slice,mems_allowed=0,global_oom,task_memcg=/user.slice/user-1001.slice/session-14.scope,task=dfbench,pi\
d=1005229,uid=1001
[262830.944733] Out of memory: Killed process 1005229 (dfbench) total-vm:103405408kB, anon-rss:64403832kB, file-rss:3456kB, shmem-rss:0kB, UID:1001 pgtables:166868kB oom_score_adj:0
[262833.485378] oom_reaper: reaped process 1005229 (dfbench), now anon-rss:0kB, file-rss:1140kB, shmem-rss:0kB
alamb@aal-dev:~/datafusion-benchmarking$
405
🤔
Query 17 iteration 4 took 6144.7 ms and returned 10 rows Q18: SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT\ 10; ./bench.sh: line 410: 1005229 Killed $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries\ /clickbench/queries.sql" -o ${RESULTS_FILE} alamb@aal-dev:~/datafusion-benchmarking$It appears on this branch the kernel killed the process due to out of memory (which does not happen on main)
alamb@aal-dev:~/datafusion-benchmarking$sudo dmesg | tail [262830.944639] [1003196] 0 1003196 4230 1440 69632 0 0 sshd [262830.944642] [1003273] 1001 1003273 4305 1341 69632 0 0 sshd [262830.944644] [1003274] 1001 1003274 2308 1152 53248 0 0 bash [262830.944646] [1003284] 1001 1003284 2080 864 57344 0 0 tmux: client [262830.944648] [1003287] 1001 1003287 2765 1230 61440 0 0 bash [262830.944650] [1005223] 1001 1005223 2007 768 57344 0 0 bash [262830.944652] [1005229] 1001 1005229 25851352 16101822 170872832 0 0 dfbench [262830.944654] oom-kill:constraint=CONSTRAINT_NONE,nodemask=(null),cpuset=user.slice,mems_allowed=0,global_oom,task_memcg=/user.slice/user-1001.slice/session-14.scope,task=dfbench,pi\ d=1005229,uid=1001 [262830.944733] Out of memory: Killed process 1005229 (dfbench) total-vm:103405408kB, anon-rss:64403832kB, file-rss:3456kB, shmem-rss:0kB, UID:1001 pgtables:166868kB oom_score_adj:0 [262833.485378] oom_reaper: reaped process 1005229 (dfbench), now anon-rss:0kB, file-rss:1140kB, shmem-rss:0kB alamb@aal-dev:~/datafusion-benchmarking$
Thank you @alamb 🙏. Let me analyze it further 🤔
Thank you @alamb 🙏. Let me analyze it further 🤔
In order to actually generate the output in multiple batches and gain performance, I think we would need to change:
- The
GroupValuesstorage (so that it never creates a large contiguous range) - The
GroupsAccumulatorslikewise to manage the internal state as multiple chunks and not as single chunks
This would likely require some sort of API change to the accumulators / etc
I wonder if we could find some way to do the implementation incrementally
Thank you @alamb 🙏. Let me analyze it further 🤔
In order to actually generate the output in multiple batches and gain performance, I think we would need to change:
1. The `GroupValues` storage (so that it never creates a large contiguous range) 2. The `GroupsAccumulators` likewise to manage the internal state as multiple chunks and not as single chunksThis would likely require some sort of API change to the accumulators / etc
I wonder if we could find some way to do the implementation incrementally
I agree, finally it should be a big change which switches the group values and related states managed by block like duckdb , and I am working on this(#11931).
But maybe just splitting the emit result still have benefits? Seems that it can avoid calling the slice function many times which really costs cpu?
I agree, finally it should be a big change which switches the group values and related states managed by block like duckdb , and I am working on this(https://github.com/apache/datafusion/issues/11931).
I think personally suggest sketching out what this would look like in a first PR without worrying about getting all the tests passing / compiling etc.
If we try to port all code at once to being managed in blocks it is going to be a very large change
I am thinking maybe we can have a incremental approach (like for example separately adding the ability to do blocked emission for https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.GroupsAccumulator.html and https://github.com/apache/datafusion/blob/fd237f8705b18fa089fdfb8dd5b04655ccb4d691/datafusion/physical-plan/src/aggregates/group_values/mod.rs#L37-L55
If we can set this up so that we get the pattern and a few common implementations setup in the first PR then we can make subsequent PRs to port over the other parts of the aggregation 🤔
I agree, finally it should be a big change which switches the group values and related states managed by block like duckdb , and I am working on this(#11931).
I think personally suggest sketching out what this would look like in a first PR without worrying about getting all the tests passing / compiling etc.
If we try to port all code at once to being managed in blocks it is going to be a very large change
I am thinking maybe we can have a incremental approach (like for example separately adding the ability to do blocked emission for https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.GroupsAccumulator.html and
https://github.com/apache/datafusion/blob/fd237f8705b18fa089fdfb8dd5b04655ccb4d691/datafusion/physical-plan/src/aggregates/group_values/mod.rs#L37-L55
If we can set this up so that we get the pattern and a few common implementations setup in the first PR then we can make subsequent PRs to port over the other parts of the aggregation 🤔
Yes, I want to do the similar things for
incremental approach
Yes... I try to do something like it but still not thorough enough, and I found it is actually hard to support the exact EmitTo::First used by streaming aggr after I found the reason why some tests failed... And we actually should disable the blocked mode in streaming aggr.
I am planning to switch to the special blocked emission impl now... It seems we can do two things to introduce the new emit modes?
- Define a
support_blocked_emissionforGroupValuesandGroupAccumulator. - Add two emit enums
pub enum EmitTo {
/// Emit all groups
All,
/// Emit only the first `n` groups and shift all existing group
/// indexes down by `n`.
///
/// For example, if `n=10`, group_index `0, 1, ... 9` are emitted
/// and group indexes '`10, 11, 12, ...` become `0, 1, 2, ...`.
First(usize),
AllBlocks,
FirstBlocks(usize),
}
What do you think this way to supprot the special blocked emission?
I agree, finally it should be a big change which switches the group values and related states managed by block like duckdb , and I am working on this(#11931).
I think personally suggest sketching out what this would look like in a first PR without worrying about getting all the tests passing / compiling etc. If we try to port all code at once to being managed in blocks it is going to be a very large change I am thinking maybe we can have a incremental approach (like for example separately adding the ability to do blocked emission for https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.GroupsAccumulator.html and https://github.com/apache/datafusion/blob/fd237f8705b18fa089fdfb8dd5b04655ccb4d691/datafusion/physical-plan/src/aggregates/group_values/mod.rs#L37-L55
If we can set this up so that we get the pattern and a few common implementations setup in the first PR then we can make subsequent PRs to port over the other parts of the aggregation 🤔
Yes, I want to do the similar things for
incremental approach
Yes... I try to do something like it but still not thorough enough, and I found it is actually hard to support the exact
EmitTo::Firstused by streaming aggr after I found the reason why some tests failed... And we actually should disable the blocked mode in streaming aggr.I am planning to switch to the special blocked emission impl now... It seems we can do two things to introduce the new emit modes?
- Define a
support_blocked_emissionforGroupValuesandGroupAccumulator.- Add two emit enums
pub enum EmitTo { /// Emit all groups All, /// Emit only the first `n` groups and shift all existing group /// indexes down by `n`. /// /// For example, if `n=10`, group_index `0, 1, ... 9` are emitted /// and group indexes '`10, 11, 12, ...` become `0, 1, 2, ...`. First(usize), AllBlocks, FirstBlocks(usize), }What do you think this way to supprot the special blocked emission?
Do we need to put batch_size in AllBlocks and FirstBlocks 🤔 ?
And that's exactly what I was planning on doing. But I found that the changes to the code were very large.
@JasonLi-cn As I think, GroupValues impls maybe should not care about the batch size? And we just do the split and merge work in the GroupedHashAggregateStream::poll , if unfortunately, the batch size != block size (usually they will equal)?
Maybe we should impl the special block based GroupValues impls like following?
- We pass the
block sizewhen initializing it - It manage the inner values block by block
- It return all blocks with internal
block sizeWe can always make theblock size == batch size, so we can totally avoid any split operators.
I am making a try about it in #11943 , and have done some related code changes.
@JasonLi-cn As I think,
GroupValuesimpls maybe should not care about thebatch size? And we just do thesplit and mergework in theGroupedHashAggregateStream::poll, if unfortunately, thebatch size != block size(usually they will equal)?Maybe we should impl the special block based
GroupValuesimpls like following?
- We pass the
block sizewhen initializing it- It manage the inner values block by block
- It return all blocks with internal
block sizeWe can always make theblock size == batch size, so we can totally avoid any split operators.I am making a try about it in #11943 , and have done some related code changes.
OK. How do we determine the value of block size?
@JasonLi-cn As I think,
GroupValuesimpls maybe should not care about thebatch size? And we just do thesplit and mergework in theGroupedHashAggregateStream::poll, if unfortunately, thebatch size != block size(usually they will equal)? Maybe we should impl the special block basedGroupValuesimpls like following?
- We pass the
block sizewhen initializing it- It manage the inner values block by block
- It return all blocks with internal
block sizeWe can always make theblock size == batch size, so we can totally avoid any split operators.I am making a try about it in #11943 , and have done some related code changes.
OK. How do we determine the value of block size?
@JasonLi-cn As I think,
GroupValuesimpls maybe should not care about thebatch size? And we just do thesplit and mergework in theGroupedHashAggregateStream::poll, if unfortunately, thebatch size != block size(usually they will equal)? Maybe we should impl the special block basedGroupValuesimpls like following?
- We pass the
block sizewhen initializing it- It manage the inner values block by block
- It return all blocks with internal
block sizeWe can always make theblock size == batch size, so we can totally avoid any split operators.I am making a try about it in #11943 , and have done some related code changes.
OK. How do we determine the value of block size?
I think maybe we make it equal to batch_size in most cases, and so that we can avoid any split operations during producing output? And for the cornercase, for example, the batch_size is too small, we can let it fallback to single block mode?
I think maybe we make it equal to batch_size in most cases, and so that we can avoid any split operations during producing output? And for the cornercase, for example, the batch_size is too small, we can let it fallback to single block mode?
Yes I think this is a good strategy
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.
I believe the plan here is that we will work to improve the coverage of aggregates and then revisit / revive this design
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.