datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Further refine the Top K sort operator

Open gruuya opened this issue 11 months ago • 13 comments

Is your feature request related to a problem or challenge?

The Top-K operator has recently been added for a specialized use case when encountering ORDER BY and LIMIT clauses together (#7250, #7721), as a way to optimize the memory usage of the sorting procedure.

Still the present implementation relies on keeping in memory the input record batches with potential row candidates for the final K output rows. This means that in the pathological case, there can be K batches in memory per the TopK operator, which are themselves spawned per input partition.

In particular this leads to the following error for ClickBench query 19:

% datafusion-cli -m 8gb
DataFusion CLI v36.0.0
❯ CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '/path/to/hits.parquet';
0 rows in set. Query took 0.032 seconds.

❯ SELECT "UserID", extract(minute FROM to_timestamp("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
Resources exhausted: Failed to allocate additional 220827928 bytes for TopK[3] with 883453086 bytes already allocated - maximum available is 150024911

In the above case I see 12 partitions x ~3.5 batches per TopK operator in memory x 223 MB per batch (which is kind of strange for 4 columns) = 9366 MB, thus peaking above the set memory limit of 8GB.

Describe the solution you'd like

Ideally something that doesn't hurt performance but reduces the memory footprint even more. Failing that, something that perhaps hurts performance only once the memory limit threshold has been surpassed (e.g. by spilling), but without crashing the query.

Describe alternatives you've considered

Option 1

Increasing or not setting a memory limit.

Option 2

Introduce spilling to disk for the TopK operator as a fallback when the memory limit is hit.

Option 3

Potentially something like converting the column arrays of the input record batch to rows, like for the evaluated sort keys https://github.com/apache/arrow-datafusion/blob/b2ff249bfb918ac6697dbc92b51262a7bdbb5971/datafusion/physical-plan/src/topk/mod.rs#L163 and then making TopKRow track the projected rows, in addition to the sort keys, but compare only against the sort key. This would enable the BinaryHeap to discard the unneeded rows.

Finally one could use arrow_row::RowConverter::convert_rows to get back the columns when emiting.

However this is almost guaranteed to lead to worse performance in the general case due to all of the row-conversion taking place.

Additional context

Potentially relevant for #7195.

gruuya avatar Mar 01 '24 19:03 gruuya