Sort ClickBench data using 4GB on standard laptop (spilling)
Is your feature request related to a problem or challenge?
While working with @zhuqi-lucas on https://github.com/apache/datafusion/pull/19042 we noticed it is not possible to sort the hits.parquet dataset
Get the data
./benchmarks/bench.sh data clickbench_1
Try to resort it using 4G of memory (on a 20 core Mac M3 laptop):
datafusion-cli -m 4G -c "COPY (SELECT * FROM 'benchmarks/data/hits.parquet' ORDER BY \"EventTime\") TO 'hits_sorted.parquet' STORED AS PARQUET;"
Results in
DataFusion CLI v51.0.0
Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
caused by
Resources exhausted: Additional allocation failed for ExternalSorter[7] with top memory consumers (across reservations) as:
ExternalSorterMerge[4]#11(can spill: false) consumed 883.8 MB, peak 883.8 MB,
ExternalSorterMerge[1]#5(can spill: false) consumed 812.6 MB, peak 812.6 MB,
ExternalSorterMerge[9]#21(can spill: false) consumed 764.8 MB, peak 764.8 MB.
Error: Failed to allocate additional 13.7 MB for ExternalSorter[7] with 0.0 B already allocated for this reservation - 1088.1 KB remain available for the total pool
As @2010YOUY01 has documented in https://datafusion.apache.org/user-guide/configs.html#memory-limited-queries, this query does run to completion with fewer target partitions for example 1:
SET datafusion.execution.target_partitions = 1;
Then this succeeds
datafusion-cli -m 4G -c "SET datafusion.execution.target_partitions = 1; COPY (SELECT * FROM 'benchmarks/data/hits.parquet' ORDER BY \"EventTime\") TO 'hits_sorted.parquet' STORED AS PARQUET;"
However, 2 target partitions still fails:
andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion2$ datafusion-cli -m 4G -c "SET datafusion.execution.target_partitions = 2; COPY (SELECT * FROM 'benchmarks/data/hits.parquet' ORDER BY \"EventTime\") TO 'hits_sorted.parquet' STORED AS PARQUET;"
DataFusion CLI v51.0.0
0 row(s) fetched.
Elapsed 0.000 seconds.
Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
caused by
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as:
ExternalSorter[1]#5(can spill: true) consumed 3.2 GB, peak 3.5 GB,
ExternalSorterMerge[1]#6(can spill: false) consumed 767.1 MB, peak 1773.9 MB,
ExternalSorterMerge[0]#4(can spill: false) consumed 10.0 MB, peak 1679.3 MB.
Error: Failed to allocate additional 27.6 MB for ExternalSorter[0] with 0.0 B already allocated for this reservation - 20.7 MB remain available for the total pool
Describe the solution you'd like
I would like DataFusion to be able to complete such queries with a reasonable amount of RAM without having to tune the target partitions
Describe alternatives you've considered
Maybe there could be be some "rule of thumb" for the required resources -- for example, perhaps we could make sure queries run with 1 GB of RAM per core (and adjust the batch size / target partitioning automatically if needed)
Additional context
No response
Hi @alamb @zhuqi-lucas , We are doing similar experiments to run clickbench queries with datafusion in lower memory instances.
Not sure if we have an EPIC to track all issues in common place.
What we noticed is that topK doesn't spill and hence all clickbench groupBy queries with OrderBy + Limit even with single target partition such as
Q13
SELECT "SearchPhrase", COUNT(DISTINCT "UserID") AS u FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY u DESC LIMIT 10;
Q33
SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 10;
also fail with out of memory error for < 8 GB RAM allocated in DF-cli. [ github.com/apache/datafusion/issues/9417 might be relevant issue ]
@alchemist51 and I've been looking into improving queries in this area.
@alchemist51 is looking into reviving https://github.com/apache/datafusion/pull/15591 and
I was able to get a working spill in my fork for topK operator - https://github.com/bharath-techie/datafusion/tree/spilltest
Can you please share your views / suggestions on the same ?
Can you please share your views / suggestions on the same ?
I am not sure what you are asking about
What we noticed is that topK doesn't spill and hence all clickbench groupBy queries with OrderBy + Limit even with single target partition such as
SELECT "SearchPhrase", COUNT(DISTINCT "UserID") AS u FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY u DESC LIMIT 10;
Since this has a LIMIT 10, the topK only needs to hold the top 10 values
Therefore, I don't think the issue is related to TopK itself, but rather than memory usage of one of the GroupbyHash aggregate operations that feeds the top k (in this case there are very many distinct "SearchPhrase" values)
The GroupByHash operator should also be able to spill when under memory pressure, so I don't know why it is failing
So this is the error I get :
datafusion-cli -m 8G
DataFusion CLI v51.0.0
> SET datafusion.execution.target_partitions=1;
0 row(s) fetched.
Elapsed 0.001 seconds.
> CREATE EXTERNAL TABLE hits
STORED AS PARQUET
LOCATION '/home/ec2-user/clickdata/partitioned/hits/*.parquet';
0 row(s) fetched.
Elapsed 0.054 seconds.
> SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 10;
Resources exhausted: Additional allocation failed for TopK[0] with top memory consumers (across reservations) as:
TopK[0]#4(can spill: false) consumed 7.8 GB, peak 7.8 GB,
GroupedHashAggregateStream[0] (count(1))#3(can spill: true) consumed 80.4 KB, peak 4.6 GB,
DataFusion-Cli#2(can spill: false) consumed 0.0 B, peak 0.0 B.
Error: Failed to allocate additional 3.9 GB for TopK[0] with 7.8 GB already allocated for this reservation - 187.0 MB remain available for the total pool
>
Once i added spill to TopK, I was able to get results back with same memory config
explain analyze SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 10;
+-------------------
| Plan with Metrics | SortExec: TopK(fetch=10), expr=[c@1 DESC], preserve_partitioning=[false], filter=[c@1 IS NULL OR c@1 > 57340], metrics=[output_rows=10, elapsed_compute=81.54ms, output_bytes=50.0 MB, output_batches=1, spill_count=19, spilled_bytes=115.1 MB, spilled_rows=125, row_replacements=189] |
| | ProjectionExec: expr=[URL@0 as URL, count(Int64(1))@1 as c], metrics=[output_rows=18.34 M, elapsed_compute=1.54ms, output_bytes=8.6 TB, output_batches=2.24 K] |
| | AggregateExec: mode=Single, gby=[URL@0 as URL], aggr=[count(Int64(1))], metrics=[output_rows=18.34 M, elapsed_compute=14.45s, output_bytes=8.6 TB, output_batches=2.24 K, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, peak_mem_used=4.90 B, aggregate_arguments_time=38.36ms, aggregation_time=389.52ms, emitting_time=1.30ms, time_calculating_group_ids=13.93s] |
| | DataSourceExec: file_groups={1 group: [[home/ec2-user/clickdata/partitioned/hits/hits_0.parquet:0..122446530, home/ec2-user/clickdata/partitioned/hits/hits_1.parquet:0..174965044, home/ec2-user/clickdata/partitioned/hits/hits_10.parquet:0..101513258, home/ec2-user/clickdata/partitioned/hits/hits_11.parquet:0..118419888, home/ec2-user/clickdata/partitioned/hits/hits_12.parquet:0..149514164, ...]]}, projection=[URL], file_type=parquet, metrics=[output_rows=100.00 M, elapsed_compute=1ns, output_bytes=20.0 GB, output_batches=12.31 K, files_ranges_pruned_statistics=100 total → 100 matched, row_groups_pruned_statistics=325 total → 325 matched, row_groups_pruned_bloom_filter=325 total → 325 matched, page_index_rows_pruned=0 total → 0 matched, batches_split=0, bytes_scanned=2.62 B, file_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=200ns, metadata_load_time=8.04ms, page_index_eval_time=200ns, row_pushdown_eval_time=200ns, statistics_eval_time=200ns, time_elapsed_opening=2.69ms, time_elapsed_processing=6.99s, time_elapsed_scanning_total=21.96s, time_elapsed_scanning_until_data=241.98ms, scan_efficiency_ratio=1600% (2.62 B/160.3 M)] |
| |
The suggestion I was asking was whether anything I can do anything in planning configuration to make this work without spill for example. I have all defaults. I tried reducing the batch size as topK holds 20*batchSize before compacting, but it didn't seem to help.
Therefore, I don't think the issue is related to TopK itself, but rather than memory usage of one of the GroupbyHash aggregate
I get this point, for each batch that gets inserted and referred to in TopK , the entire batch size gets added during the size estimation. [ as mentioned in https://github.com/apache/datafusion/issues/9562 ]
Path : datafusion/datafusion/datafusion/physical-plan/src/topk/mod.rs
/// Insert a record batch entry into this store, tracking its
/// memory use, if it has any uses
pub fn insert(&mut self, entry: RecordBatchEntry) {
// uses of 0 means that none of the rows in the batch were stored in the topk
if entry.uses > 0 {
let size = get_record_batch_memory_size(&entry.batch);
self.batches_size += size;
println!("size during insert : {}", size);
self.batches.insert(entry.id, entry);
}
}
/// returns the size of memory used by this store, including all
/// referenced `RecordBatch`es, in bytes
pub fn size(&self) -> usize {
// size_of::<Self>()
// + self.batches.capacity() * (size_of::<u32>() + size_of::<RecordBatchEntry>())
// + self.batches_size
let sizeOfSelf = size_of::<Self>();
let capacity = self.batches.capacity();
let u32RecordBatch = size_of::<u32>() + size_of::<RecordBatchEntry>();
let batchesSize = self.batches_size;
let size = sizeOfSelf + capacity * u32RecordBatch + batchesSize;
println!("self size : {} , capacity : {} , heap size : {}, batch size : {}",
sizeOfSelf, capacity, u32RecordBatch, batchesSize);
println!("size during get : {}", size);
size
}
For the above URL query, we have ~ 4 GB record batch in groupByHashAggregate which gets counted for each record batch that got added to topK
Record batch size during insert : 4196909056
self size : 48 , capacity : 3 , heap size : 60, batch size : 4196909056
size during get : 4196909284
Record batch size during insert size during insert : 4196909056
self size : 48 , capacity : 3 , heap size : 60, batch size : 8393818112
size during get : 8393818340
Record batch size during insert size during insert : 4196909056
self size : 48 , capacity : 3 , heap size : 60, batch size : 12590727168
size during get : 12590727396
By going through previous such issues :
-
I think @Dandandan mentioned force compaction when reaching memory limit, should we try that?
-
https://github.com/apache/datafusion/pull/15591 could help as well or do we have any latest issues which can help here ?
Apologies if I'm polluting this issue which is unrelated to topK - maybe we can discuss over in https://github.com/apache/datafusion/issues/9417 or in a new issue.
For the above URL query, we have ~ 4 GB record batch in groupByHashAggregate which gets counted for each record batch that got added to topK
Ah. that makes sense. Nice find
I think @Dandandan https://github.com/apache/datafusion/issues/9417#issuecomment-2431943283 force compaction when reaching memory limit, should we try that?
Yes I think that would be a much better approach than trying to spill the entire 4GB batch (because the topk operator is only keeping a small number of rows -- 10, spilling 4GB just to read it back, seems very wasteful)
https://github.com/apache/datafusion/pull/15591 could help as well or do we have any latest issues which can help here ?
The issue that was tracking that is:
- https://github.com/apache/datafusion/issues/7065
However, that issue focuses more on performance rather than memory pressure coming from single large allocations. It is probably a good idea to file a new issue that focuses on the memory pressure area rather than performance
@bharath-techie would you like to file the tickets (force compact in TopK as well as large single allocation in GroupBy)?
Yes I think that would be a much better approach than trying to spill the entire 4GB batch
I'm just spilling the topK state and I free up the record batches, so its quite efficient - then i got to thinking why do i need to spill , I can just force compact.
| Plan with Metrics | SortExec: TopK(fetch=10), expr=[c@1 DESC], preserve_partitioning=[false],
filter=[c@1 IS NULL OR c@1 > 57340], metrics=[output_rows=10,
elapsed_compute=81.54ms, output_bytes=50.0 MB,
output_batches=1, spill_count=19, spilled_bytes=115.1 MB, spilled_rows=125, row_replacements=189] |
I'll raise the issue to look more into force compact in TopK.
@alchemist51 please raise the issue for large single allocation in GroupBy.