datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Further refine the Top K sort operator

Open gruuya opened this issue 1 year ago • 13 comments

Is your feature request related to a problem or challenge?

The Top-K operator has recently been added for a specialized use case when encountering ORDER BY and LIMIT clauses together (#7250, #7721), as a way to optimize the memory usage of the sorting procedure.

Still the present implementation relies on keeping in memory the input record batches with potential row candidates for the final K output rows. This means that in the pathological case, there can be K batches in memory per the TopK operator, which are themselves spawned per input partition.

In particular this leads to the following error for ClickBench query 19:

% datafusion-cli -m 8gb
DataFusion CLI v36.0.0
❯ CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '/path/to/hits.parquet';
0 rows in set. Query took 0.032 seconds.

❯ SELECT "UserID", extract(minute FROM to_timestamp("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
Resources exhausted: Failed to allocate additional 220827928 bytes for TopK[3] with 883453086 bytes already allocated - maximum available is 150024911

In the above case I see 12 partitions x ~3.5 batches per TopK operator in memory x 223 MB per batch (which is kind of strange for 4 columns) = 9366 MB, thus peaking above the set memory limit of 8GB.

Describe the solution you'd like

Ideally something that doesn't hurt performance but reduces the memory footprint even more. Failing that, something that perhaps hurts performance only once the memory limit threshold has been surpassed (e.g. by spilling), but without crashing the query.

Describe alternatives you've considered

Option 1

Increasing or not setting a memory limit.

Option 2

Introduce spilling to disk for the TopK operator as a fallback when the memory limit is hit.

Option 3

Potentially something like converting the column arrays of the input record batch to rows, like for the evaluated sort keys https://github.com/apache/arrow-datafusion/blob/b2ff249bfb918ac6697dbc92b51262a7bdbb5971/datafusion/physical-plan/src/topk/mod.rs#L163 and then making TopKRow track the projected rows, in addition to the sort keys, but compare only against the sort key. This would enable the BinaryHeap to discard the unneeded rows.

Finally one could use arrow_row::RowConverter::convert_rows to get back the columns when emiting.

However this is almost guaranteed to lead to worse performance in the general case due to all of the row-conversion taking place.

Additional context

Potentially relevant for #7195.

gruuya avatar Mar 01 '24 19:03 gruuya

TL;DR: The issue is caused by "double" memory accounting for sliced batches in AggExec and TopkExec.


The primary cause of resource exhaustion is incorrect memory accounting for record batches stored in TopK's RecordBatchStore, as highlighted in the issue description (approximately 220MB per batch). Upon inspecting the memory size calculation output:

Getting mem size of batch in topk::insert with batch size: 8192
Column 0 mem: 37561184
Column 1 mem: 37561184
Column 2 mem: 78416312
Column 3 mem: 72507488
Inserting batch with mem size: 226046168

It becomes evident that the batch is a zero-copy slice of a larger batch, resulting in a discrepancy between actual and expected memory used by TopK, considering only 8192 rows.

Analyzing the physical plan:

| physical_plan | GlobalLimitExec: skip=0, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|               |   SortPreservingMergeExec: [COUNT(*)@3 DESC], fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |     SortExec: TopK(fetch=10), expr=[COUNT(*)@3 DESC]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |       ProjectionExec: expr=[UserID@0 as UserID, date_part(Utf8("MINUTE"),to_timestamp(hits.EventTime))@1 as m, SearchPhrase@2 as SearchPhrase, COUNT(*)@3 as COUNT(*)]                                                                                                                                                                                                                                                                                                                                                                         |
|               |         AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID, date_part(Utf8("MINUTE"),to_timestamp(hits.EventTime))@1 as date_part(Utf8("MINUTE"),to_timestamp(hits.EventTime)), SearchPhrase@2 as SearchPhrase], aggr=[COUNT(*)]                                                                                                                                                                                                                                                                                                    |
|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |             RepartitionExec: partitioning=Hash([UserID@0, date_part(Utf8("MINUTE"),to_timestamp(hits.EventTime))@1, SearchPhrase@2], 12), input_partitions=12                                                                                                                                                                                                                                                                                                                                                                                  |
|               |               AggregateExec: mode=Partial, gby=[UserID@1 as UserID, date_part(MINUTE, to_timestamp(EventTime@0)) as date_part(Utf8("MINUTE"),to_timestamp(hits.EventTime)), SearchPhrase@2 as SearchPhrase], aggr=[COUNT(*)]                                                                                                                                                                                                                                                                                                                   |
|               |                 ParquetExec: file_groups={12 groups: ....

and AggExec:

https://github.com/apache/arrow-datafusion/blob/d5b635945307d5c7fe6fa10d3f65ee1ba2d58a5a/datafusion/physical-plan/src/aggregates/row_hash.rs#L493-L497

For the current plan, we can see that each batch we insert into TopK is a slice of the Agg output batch, which AggExec should track. We need to avoid double memory accounting for sliced batches to fix this issue.

And for option3, there is maybe_compact in TopK serving a similar purpose, but still keeps relevant records in record batch.

yjshen avatar Mar 03 '24 00:03 yjshen

The issue is caused by "double" memory accounting for sliced batches in AggExec and TopkExec.

I thought something similar, namely that TopKExec makes some errors in memory accounting but then observing the actual memory usage seems to indicate that the peak is real.

  • just prior to executing the ClickBench query image

  • peak memory, prior to DataFusionError::ResourcesExhausted error getting thrown image

gruuya avatar Mar 03 '24 07:03 gruuya

Note that in this query, AggregateExec needs a large amount of memory, as we need to keep all the data in memory before sorting/extract top k counts, so relatively high memory usage is to be expected (even though we probably can optimize it further).

Dandandan avatar Mar 03 '24 15:03 Dandandan

Note that in this query, AggregateExec needs a large amount of memory, as we need to keep all the data in memory before sorting/extract top k counts, so relatively high memory usage is to be expected (even though we probably can optimize it further).

Ah good point, the cardinality of the input grouped data is indeed very large (~17M).

Indeed, a quick google search brought up the following recent paper (citing DataFusion/your blog post) about a new high-cardinality top K aggregation technique: https://www.microsoft.com/en-us/research/publication/cache-efficient-top-k-aggregation-over-high-cardinality-large-datasets/

gruuya avatar Mar 03 '24 20:03 gruuya

From DataFusion's memory management perspective, I found that get_slice_memory_size, introduced in https://github.com/apache/arrow-rs/pull/3501, better serves our requirements.

I suggest we have RecordBatch::get_effective_memory_size() in DF and use get_slice_memory_size to account for memory usage. Thoughts?

yjshen avatar Mar 06 '24 18:03 yjshen

I agree with @Dandandan in https://github.com/apache/arrow-datafusion/issues/9417#issuecomment-1975197127 that the core problem is with accounting

  1. The AggregateExec generates one single (giant) RecordBatch on output (source)
  2. Which is then emitted in parts (via RecordBatch::slice(), which does not actually allocate any additional memory) (source) -- note this means no memory is freed until the GroupByHash has output all the output
  3. The TopK operator, however, then treats each incoming RecordBatch as though it were were an additional allocation that needs to be tracked (source)

If we had infinite time / engineering hours I think a better approach would actually be to change GroupByHash so it didn't create a single giant contiguous RecordBatch

Instead it would be better if GroupByHash produced a Vec<RecordBatch> and then incrementally fed those batches out

Doing this would allow the GroupByHash to release memory incrementally as it output. This is analogous to how @korowa made join output incremental in https://github.com/apache/arrow-datafusion/pull/8658

alamb avatar Mar 10 '24 11:03 alamb

If incremental output of Grouping sounds reasonable to people I can file a follow on ticket to track the work.

alamb avatar Mar 10 '24 11:03 alamb

I agree that the core problem for the issue is accounting and that the most overreported batch slice would come from AggExec's mono output record batch. But I also believe there's a distinction between optimizing AggExec's output pattern and handling memory accounting.

To improve AggExec's mono output pattern, #7065 might be similar to the idea of incremental output.

Regarding the memory accounting side, I'm curious if you have considered alternatives that allow for more accurate accounting for different batches. The idea of having sliced batches not reporting their memory usage or using get_slice_memory_size for reporting is a good starting point. What do you think about this?

yjshen avatar Mar 10 '24 22:03 yjshen

If incremental output of Grouping sounds reasonable to people I can file a follow on ticket to track the work.

Yes, please do

ozankabak avatar Mar 11 '24 16:03 ozankabak

If incremental output of Grouping sounds reasonable to people I can file a follow on ticket to track the work.

Makes sense to me as well, thank you 🙏

Dandandan avatar Mar 11 '24 17:03 Dandandan

But I also believe there's a distinction between optimizing AggExec's output pattern and handling memory accounting.

I agree

Regarding the memory accounting side, I'm curious if you have considered alternatives that allow for more accurate accounting for different batches. The idea of having sliced batches not reporting their memory usage or using get_slice_memory_size for reporting is a good starting point. What do you think about this?

I think it is tricky business and depends on what we are using the memory accounting for

At the moment I think the memory accounting is mostly to prevent OOM kills (over commit of memory), since memory for a sliced RecordBatch is not returned to the OS , a plan with a 3 row slice of a 1M row RecordBatch still is "using" all 1M rows from the OS perspective

However, ensuring we don't double count is important too (like two slices to the same 1M row RecordBatch will count as a total of 2M rows, even though there only a single allocation).

alamb avatar Mar 11 '24 20:03 alamb

Filed https://github.com/apache/arrow-datafusion/issues/9562 to track incremental group by output

alamb avatar Mar 11 '24 20:03 alamb

I think #10511 is related to this, except it's using ExternalSorterMerge which from the name has no "top K" behaviour, hence the very high memory footprint?

samuelcolvin avatar May 15 '24 22:05 samuelcolvin

Coming back to this, I guess if we can implement another option without implementing spilling: force compaction once we hit the limit. This probably slows down some queries if memory usage limit is hit, but the query will fail otherwise anyway.

Dandandan avatar Oct 23 '24 12:10 Dandandan

Coming back to this, I guess if we can implement another option without implementing spilling: force compaction once we hit the limit. This probably slows down some queries if memory usage limit is hit, but the query will fail otherwise anyway.

I think this is a great idea

alamb avatar Oct 23 '24 15:10 alamb

Implementing this "reduce memory usage when under pressure" might be a more interesting general approach to improve DataFusion's performance under memory pressure (e.g. maybe we can trigger other operators to clear memory (like partial aggregates) when we hit memory pressure 🤔

alamb avatar Oct 23 '24 15:10 alamb

Implementing this "reduce memory usage when under pressure" might be a more interesting general approach to improve DataFusion's performance under memory pressure (e.g. maybe we can trigger other operators to clear memory (like partial aggregates) when we hit memory pressure 🤔

That's an interesting idea :)

Dandandan avatar Oct 23 '24 20:10 Dandandan