datafusion
datafusion copied to clipboard
Support Top-K query optimization for `ORDER BY <EXPR> [ASC|DESC] LIMIT n`
Is your feature request related to a problem or challenge? Please describe what you are trying to do. As explained by @alamb in the user defined plan:
TopK Background: A "Top K" node is a common query optimization which is used for queries such as "find the top 3 customers by revenue". The (simplified) SQL for such a query might be:
CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT)
STORED AS CSV location 'tests/customer.csv';
SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
And a naive plan would be:
> explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+--------------+----------------------------------------+
| plan_type | plan |
+--------------+----------------------------------------+
| logical_plan | Limit: 3 |
| | Sort: #revenue DESC NULLS FIRST |
| | Projection: #customer_id, #revenue |
| | TableScan: sales |
+--------------+----------------------------------------+
While this plan produces the correct answer, the careful reader will note it fully sorts the input before discarding everything other than the top 3 elements.
In the ClickBench benchmarks (https://github.com/ClickHouse/ClickBench) we can see many queries that might benefit from this optimization, but we can see they are converted to a "naive" plan. See for example query 9:
9. EXPLAIN SELECT "RegionID", COUNT(DISTINCT "UserID") AS u FROM hits GROUP BY "RegionID" ORDER BY u DESC LIMIT 10;
-----------------------------------------
DataFusion CLI v12.0.0
0 rows in set. Query took 0.062 seconds.
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: #u DESC NULLS FIRST |
| | Projection: #hits.RegionID, #COUNT(DISTINCT hits.UserID) AS u |
| | Projection: #group_alias_0 AS RegionID, #COUNT(alias1) AS COUNT(DISTINCT hits.UserID) |
| | Aggregate: groupBy=[[#group_alias_0]], aggr=[[COUNT(#alias1)]] |
| | Aggregate: groupBy=[[#hits.RegionID AS group_alias_0, #hits.UserID AS alias1]], aggr=[[]] |
| | TableScan: hits projection=[RegionID, UserID] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: [u@1 DESC] |
| | CoalescePartitionsExec |
| | ProjectionExec: expr=[RegionID@0 as RegionID, COUNT(DISTINCT hits.UserID)@1 as u] |
| | ProjectionExec: expr=[group_alias_0@0 as RegionID, COUNT(alias1)@1 as COUNT(DISTINCT hits.UserID)] |
| | AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | RepartitionExec: partitioning=Hash([Column { name: "group_alias_0", index: 0 }], 16) |
| | AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)] |
| | AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | RepartitionExec: partitioning=Hash([Column { name: "group_alias_0", index: 0 }, Column { name: "alias1", index: 1 }], 16) |
| | AggregateExec: mode=Partial, gby=[RegionID@0 as group_alias_0, UserID@1 as alias1], aggr=[] |
| | RepartitionExec: partitioning=RoundRobinBatch(16) |
| | ParquetExec: limit=None, partitions=[home/danielheres/Code/gdd/ClickBench/datafusion/hits.parquet], projection=[RegionID, UserID] |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 rows in set. Query took 0.015 seconds.
Describe the solution you'd like Implement this feature.
It basically means pushing the limit information down to SortPreservingMergeStream and keeping only the top N elements in the min_heap datastructure.
Describe alternatives you've considered n/a
Additional context Add any other context or screenshots about the feature request here.
Min heap is a reasonable data structure to solve the issue.
The TopK operator can also be parallelized the same way as a multi-stage grouping
Like run TopK on each of the input DataFusion partitions, Union them all together, and then do a TopK on the final plan
I think there are two items here
- parallelizing
order byfollowed bylimit n: - implement topk operator
Currently a plan looks like the following
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: [COUNT(UInt8(1))@3 DESC] |
I think we should be translating it to:
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: [COUNT(UInt8(1))@3 DESC] |
| | CoalescePartitionsExec |
Already with the current implementation we should be able to rewrite it to:
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortPreservingMergeExec |
| | LocalLimitExec: skip=0, fetch=10 |
| | SortExec: [COUNT(UInt8(1))@3 DESC]
Already with the current implementation we should be able to rewrite it to:
I agree this would help performance. However, it will become irrelevant if/when we implement a real TopK operator
Already with the current implementation we should be able to rewrite it to:
I agree this would help performance. However, it will become irrelevant if/when we implement a real TopK operator
I imagine a similar optimization is still useful with a TopK operator. The example TopK operator in the tests still works only a single partition and can't run in parallel. I think you'll still need a "nested" TopK operator similar to the above plan to achieve the parallelization. So at the moment there is a TopK operator, we can change it to utilize this.
Of course the TopK operator will use way less memory - but in terms of performance for simple queries that can be executed in memory, the rewrite should probably already have a big benefit.
@Dandandan -- I had not thought of the "parallel sorts with limit" as a good intermediate state -- you are exactly right and that makes perfect sense
After playing with it a bit more, I think this issue boils down to pushing a limit to SortPreservingMergeStream and using it there to only keep top N items instead of keeping the full sorted output. The SortPreservingMergeStream already uses a heap data structure mentioned by @xudong963
So it seems we mostly have the benefits of a TopK operator now by pushing down the limit to individual operations.
There are a couple of followups possible (will create some tickets for them and close this one):
- Avoiding spilling to disk by reinserting the to-spill data to a memory buffer
- Use limit in
SortPreserveMergeExec - Keeping memory under control by merging buffered batches once in a while
There are a couple of followups possible (will create some tickets for them and close this one): Use limit in SortPreserveMergeExec
is there an issue created for this follow-up? I didn't find it, so I create https://github.com/apache/arrow-datafusion/issues/6000. Feel free to close it if duplicated.
Also, feel free to correct me if my understanding and expectation in #6000 are wrong.
I believe this is completed by the work and we should track any upcoming issues somewhere else.