datafusion
datafusion copied to clipboard
Further refine the Top K sort operator
Is your feature request related to a problem or challenge?
The Top-K operator has recently been added for a specialized use case when encountering ORDER BY
and LIMIT
clauses together (#7250, #7721), as a way to optimize the memory usage of the sorting procedure.
Still the present implementation relies on keeping in memory the input record batches with potential row candidates for the final K output rows. This means that in the pathological case, there can be K batches in memory per the TopK operator, which are themselves spawned per input partition.
In particular this leads to the following error for ClickBench query 19:
% datafusion-cli -m 8gb
DataFusion CLI v36.0.0
❯ CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '/path/to/hits.parquet';
0 rows in set. Query took 0.032 seconds.
❯ SELECT "UserID", extract(minute FROM to_timestamp("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
Resources exhausted: Failed to allocate additional 220827928 bytes for TopK[3] with 883453086 bytes already allocated - maximum available is 150024911
In the above case I see 12 partitions x ~3.5 batches per TopK operator in memory x 223 MB per batch (which is kind of strange for 4 columns) = 9366 MB, thus peaking above the set memory limit of 8GB.
Describe the solution you'd like
Ideally something that doesn't hurt performance but reduces the memory footprint even more. Failing that, something that perhaps hurts performance only once the memory limit threshold has been surpassed (e.g. by spilling), but without crashing the query.
Describe alternatives you've considered
Option 1
Increasing or not setting a memory limit.
Option 2
Introduce spilling to disk for the TopK operator as a fallback when the memory limit is hit.
Option 3
Potentially something like converting the column arrays of the input record batch to rows, like for the evaluated sort keys
https://github.com/apache/arrow-datafusion/blob/b2ff249bfb918ac6697dbc92b51262a7bdbb5971/datafusion/physical-plan/src/topk/mod.rs#L163
and then making TopKRow
track the projected rows, in addition to the sort keys, but compare only against the sort key. This would enable the BinaryHeap
to discard the unneeded rows.
Finally one could use arrow_row::RowConverter::convert_rows
to get back the columns when emit
ing.
However this is almost guaranteed to lead to worse performance in the general case due to all of the row-conversion taking place.
Additional context
Potentially relevant for #7195.