datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Sort ClickBench data using 4GB on standard laptop (spilling)

Open alamb opened this issue 1 month ago • 7 comments

Is your feature request related to a problem or challenge?

While working with @zhuqi-lucas on https://github.com/apache/datafusion/pull/19042 we noticed it is not possible to sort the hits.parquet dataset

Get the data

./benchmarks/bench.sh data clickbench_1

Try to resort it using 4G of memory (on a 20 core Mac M3 laptop):

datafusion-cli -m 4G -c "COPY (SELECT * FROM 'benchmarks/data/hits.parquet' ORDER BY \"EventTime\") TO 'hits_sorted.parquet' STORED AS PARQUET;"

Results in

DataFusion CLI v51.0.0
Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
caused by
Resources exhausted: Additional allocation failed for ExternalSorter[7] with top memory consumers (across reservations) as:
  ExternalSorterMerge[4]#11(can spill: false) consumed 883.8 MB, peak 883.8 MB,
  ExternalSorterMerge[1]#5(can spill: false) consumed 812.6 MB, peak 812.6 MB,
  ExternalSorterMerge[9]#21(can spill: false) consumed 764.8 MB, peak 764.8 MB.
Error: Failed to allocate additional 13.7 MB for ExternalSorter[7] with 0.0 B already allocated for this reservation - 1088.1 KB remain available for the total pool

As @2010YOUY01 has documented in https://datafusion.apache.org/user-guide/configs.html#memory-limited-queries, this query does run to completion with fewer target partitions for example 1:

SET datafusion.execution.target_partitions = 1;

Then this succeeds

datafusion-cli -m 4G -c "SET datafusion.execution.target_partitions = 1; COPY (SELECT * FROM 'benchmarks/data/hits.parquet' ORDER BY \"EventTime\") TO 'hits_sorted.parquet' STORED AS PARQUET;"

However, 2 target partitions still fails:

andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion2$ datafusion-cli -m 4G -c "SET datafusion.execution.target_partitions = 2; COPY (SELECT * FROM 'benchmarks/data/hits.parquet' ORDER BY \"EventTime\") TO 'hits_sorted.parquet' STORED AS PARQUET;"
DataFusion CLI v51.0.0
0 row(s) fetched.
Elapsed 0.000 seconds.

Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
caused by
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as:
  ExternalSorter[1]#5(can spill: true) consumed 3.2 GB, peak 3.5 GB,
  ExternalSorterMerge[1]#6(can spill: false) consumed 767.1 MB, peak 1773.9 MB,
  ExternalSorterMerge[0]#4(can spill: false) consumed 10.0 MB, peak 1679.3 MB.
Error: Failed to allocate additional 27.6 MB for ExternalSorter[0] with 0.0 B already allocated for this reservation - 20.7 MB remain available for the total pool

Describe the solution you'd like

I would like DataFusion to be able to complete such queries with a reasonable amount of RAM without having to tune the target partitions

Describe alternatives you've considered

Maybe there could be be some "rule of thumb" for the required resources -- for example, perhaps we could make sure queries run with 1 GB of RAM per core (and adjust the batch size / target partitioning automatically if needed)

Additional context

No response

alamb avatar Dec 08 '25 18:12 alamb

Hi @alamb @zhuqi-lucas , We are doing similar experiments to run clickbench queries with datafusion in lower memory instances.

Not sure if we have an EPIC to track all issues in common place.

What we noticed is that topK doesn't spill and hence all clickbench groupBy queries with OrderBy + Limit even with single target partition such as Q13

SELECT "SearchPhrase", COUNT(DISTINCT "UserID") AS u FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY u DESC LIMIT 10;

Q33

SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 10;

also fail with out of memory error for < 8 GB RAM allocated in DF-cli. [ github.com/apache/datafusion/issues/9417 might be relevant issue ]

@alchemist51 and I've been looking into improving queries in this area.

@alchemist51 is looking into reviving https://github.com/apache/datafusion/pull/15591 and

I was able to get a working spill in my fork for topK operator - https://github.com/bharath-techie/datafusion/tree/spilltest

Can you please share your views / suggestions on the same ?

bharath-techie avatar Dec 09 '25 17:12 bharath-techie

Can you please share your views / suggestions on the same ?

I am not sure what you are asking about

What we noticed is that topK doesn't spill and hence all clickbench groupBy queries with OrderBy + Limit even with single target partition such as

SELECT "SearchPhrase", COUNT(DISTINCT "UserID") AS u FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY u DESC LIMIT 10;

Since this has a LIMIT 10, the topK only needs to hold the top 10 values

Therefore, I don't think the issue is related to TopK itself, but rather than memory usage of one of the GroupbyHash aggregate operations that feeds the top k (in this case there are very many distinct "SearchPhrase" values)

The GroupByHash operator should also be able to spill when under memory pressure, so I don't know why it is failing

alamb avatar Dec 09 '25 22:12 alamb

So this is the error I get :

datafusion-cli -m 8G
DataFusion CLI v51.0.0
> SET datafusion.execution.target_partitions=1;
0 row(s) fetched. 
Elapsed 0.001 seconds.

> CREATE EXTERNAL TABLE hits 
STORED AS PARQUET 
LOCATION '/home/ec2-user/clickdata/partitioned/hits/*.parquet';
0 row(s) fetched. 
Elapsed 0.054 seconds.

> SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 10;
Resources exhausted: Additional allocation failed for TopK[0] with top memory consumers (across reservations) as:
  TopK[0]#4(can spill: false) consumed 7.8 GB, peak 7.8 GB,
  GroupedHashAggregateStream[0] (count(1))#3(can spill: true) consumed 80.4 KB, peak 4.6 GB,
  DataFusion-Cli#2(can spill: false) consumed 0.0 B, peak 0.0 B.
Error: Failed to allocate additional 3.9 GB for TopK[0] with 7.8 GB already allocated for this reservation - 187.0 MB remain available for the total pool
> 

Once i added spill to TopK, I was able to get results back with same memory config

explain analyze SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 10;
+-------------------                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           

| Plan with Metrics | SortExec: TopK(fetch=10), expr=[c@1 DESC], preserve_partitioning=[false], filter=[c@1 IS NULL OR c@1 > 57340], metrics=[output_rows=10, elapsed_compute=81.54ms, output_bytes=50.0 MB, output_batches=1, spill_count=19, spilled_bytes=115.1 MB, spilled_rows=125, row_replacements=189]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
|                   |   ProjectionExec: expr=[URL@0 as URL, count(Int64(1))@1 as c], metrics=[output_rows=18.34 M, elapsed_compute=1.54ms, output_bytes=8.6 TB, output_batches=2.24 K]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|                   |     AggregateExec: mode=Single, gby=[URL@0 as URL], aggr=[count(Int64(1))], metrics=[output_rows=18.34 M, elapsed_compute=14.45s, output_bytes=8.6 TB, output_batches=2.24 K, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, peak_mem_used=4.90 B, aggregate_arguments_time=38.36ms, aggregation_time=389.52ms, emitting_time=1.30ms, time_calculating_group_ids=13.93s]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|                   |       DataSourceExec: file_groups={1 group: [[home/ec2-user/clickdata/partitioned/hits/hits_0.parquet:0..122446530, home/ec2-user/clickdata/partitioned/hits/hits_1.parquet:0..174965044, home/ec2-user/clickdata/partitioned/hits/hits_10.parquet:0..101513258, home/ec2-user/clickdata/partitioned/hits/hits_11.parquet:0..118419888, home/ec2-user/clickdata/partitioned/hits/hits_12.parquet:0..149514164, ...]]}, projection=[URL], file_type=parquet, metrics=[output_rows=100.00 M, elapsed_compute=1ns, output_bytes=20.0 GB, output_batches=12.31 K, files_ranges_pruned_statistics=100 total → 100 matched, row_groups_pruned_statistics=325 total → 325 matched, row_groups_pruned_bloom_filter=325 total → 325 matched, page_index_rows_pruned=0 total → 0 matched, batches_split=0, bytes_scanned=2.62 B, file_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=200ns, metadata_load_time=8.04ms, page_index_eval_time=200ns, row_pushdown_eval_time=200ns, statistics_eval_time=200ns, time_elapsed_opening=2.69ms, time_elapsed_processing=6.99s, time_elapsed_scanning_total=21.96s, time_elapsed_scanning_until_data=241.98ms, scan_efficiency_ratio=1600% (2.62 B/160.3 M)] |
|                   | 

The suggestion I was asking was whether anything I can do anything in planning configuration to make this work without spill for example. I have all defaults. I tried reducing the batch size as topK holds 20*batchSize before compacting, but it didn't seem to help.

bharath-techie avatar Dec 10 '25 03:12 bharath-techie

Therefore, I don't think the issue is related to TopK itself, but rather than memory usage of one of the GroupbyHash aggregate

I get this point, for each batch that gets inserted and referred to in TopK , the entire batch size gets added during the size estimation. [ as mentioned in https://github.com/apache/datafusion/issues/9562 ]

Path : datafusion/datafusion/datafusion/physical-plan/src/topk/mod.rs

/// Insert a record batch entry into this store, tracking its
    /// memory use, if it has any uses
    pub fn insert(&mut self, entry: RecordBatchEntry) {
        // uses of 0 means that none of the rows in the batch were stored in the topk
        if entry.uses > 0 {
            let size = get_record_batch_memory_size(&entry.batch);
            self.batches_size += size;
            println!("size during insert : {}", size);
            self.batches.insert(entry.id, entry);
        }
    }

/// returns the size of memory used by this store, including all
    /// referenced `RecordBatch`es, in bytes
    pub fn size(&self) -> usize {
        // size_of::<Self>()
        //     + self.batches.capacity() * (size_of::<u32>() + size_of::<RecordBatchEntry>())
        //     + self.batches_size

        let sizeOfSelf = size_of::<Self>();
        let capacity = self.batches.capacity();
        let u32RecordBatch = size_of::<u32>() + size_of::<RecordBatchEntry>();
        let batchesSize = self.batches_size;

        let size = sizeOfSelf + capacity * u32RecordBatch + batchesSize;
        println!("self size : {} , capacity : {} , heap size : {}, batch size : {}",
                    sizeOfSelf, capacity, u32RecordBatch, batchesSize);
        println!("size during get : {}", size);
        size
    }

For the above URL query, we have ~ 4 GB record batch in groupByHashAggregate which gets counted for each record batch that got added to topK

Record batch size during insert : 4196909056
self size : 48 , capacity : 3 , heap size : 60, batch size : 4196909056
size during get : 4196909284

Record batch size during insert size during insert : 4196909056
self size : 48 , capacity : 3 , heap size : 60, batch size : 8393818112
size during get : 8393818340

Record batch size during insert size during insert : 4196909056
self size : 48 , capacity : 3 , heap size : 60, batch size : 12590727168
size during get : 12590727396

By going through previous such issues :

  • I think @Dandandan mentioned force compaction when reaching memory limit, should we try that?

  • https://github.com/apache/datafusion/pull/15591 could help as well or do we have any latest issues which can help here ?

Apologies if I'm polluting this issue which is unrelated to topK - maybe we can discuss over in https://github.com/apache/datafusion/issues/9417 or in a new issue.

bharath-techie avatar Dec 10 '25 06:12 bharath-techie

For the above URL query, we have ~ 4 GB record batch in groupByHashAggregate which gets counted for each record batch that got added to topK

Ah. that makes sense. Nice find

I think @Dandandan https://github.com/apache/datafusion/issues/9417#issuecomment-2431943283 force compaction when reaching memory limit, should we try that?

Yes I think that would be a much better approach than trying to spill the entire 4GB batch (because the topk operator is only keeping a small number of rows -- 10, spilling 4GB just to read it back, seems very wasteful)

https://github.com/apache/datafusion/pull/15591 could help as well or do we have any latest issues which can help here ?

The issue that was tracking that is:

  • https://github.com/apache/datafusion/issues/7065

However, that issue focuses more on performance rather than memory pressure coming from single large allocations. It is probably a good idea to file a new issue that focuses on the memory pressure area rather than performance

alamb avatar Dec 10 '25 11:12 alamb

@bharath-techie would you like to file the tickets (force compact in TopK as well as large single allocation in GroupBy)?

alamb avatar Dec 10 '25 11:12 alamb

Yes I think that would be a much better approach than trying to spill the entire 4GB batch

I'm just spilling the topK state and I free up the record batches, so its quite efficient - then i got to thinking why do i need to spill , I can just force compact.

| Plan with Metrics | SortExec: TopK(fetch=10), expr=[c@1 DESC], preserve_partitioning=[false], 
filter=[c@1 IS NULL OR c@1 > 57340], metrics=[output_rows=10, 
elapsed_compute=81.54ms, output_bytes=50.0 MB, 

output_batches=1, spill_count=19, spilled_bytes=115.1 MB, spilled_rows=125, row_replacements=189]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |

I'll raise the issue to look more into force compact in TopK.

@alchemist51 please raise the issue for large single allocation in GroupBy.

bharath-techie avatar Dec 10 '25 12:12 bharath-techie