datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Support Top-K query optimization for `ORDER BY <EXPR> [ASC|DESC] LIMIT n`

Open Dandandan opened this issue 3 years ago • 8 comments

Is your feature request related to a problem or challenge? Please describe what you are trying to do. As explained by @alamb in the user defined plan:

TopK Background: A "Top K" node is a common query optimization which is used for queries such as "find the top 3 customers by revenue". The (simplified) SQL for such a query might be:

CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT)
 STORED AS CSV location 'tests/customer.csv';

SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;

And a naive plan would be:

> explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
 +--------------+----------------------------------------+
 | plan_type    | plan                                   |
 +--------------+----------------------------------------+
| logical_plan | Limit: 3                               |
 |              |   Sort: #revenue DESC NULLS FIRST      |
 |              |     Projection: #customer_id, #revenue |
 |              |       TableScan: sales |
 +--------------+----------------------------------------+

While this plan produces the correct answer, the careful reader will note it fully sorts the input before discarding everything other than the top 3 elements.

In the ClickBench benchmarks (https://github.com/ClickHouse/ClickBench) we can see many queries that might benefit from this optimization, but we can see they are converted to a "naive" plan. See for example query 9:

9. EXPLAIN SELECT "RegionID", COUNT(DISTINCT "UserID") AS u FROM hits GROUP BY "RegionID" ORDER BY u DESC LIMIT 10;

-----------------------------------------


DataFusion CLI v12.0.0
0 rows in set. Query took 0.062 seconds.
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                          |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Limit: skip=0, fetch=10                                                                                                                                       |
|               |   Sort: #u DESC NULLS FIRST                                                                                                                                   |
|               |     Projection: #hits.RegionID, #COUNT(DISTINCT hits.UserID) AS u                                                                                             |
|               |       Projection: #group_alias_0 AS RegionID, #COUNT(alias1) AS COUNT(DISTINCT hits.UserID)                                                                   |
|               |         Aggregate: groupBy=[[#group_alias_0]], aggr=[[COUNT(#alias1)]]                                                                                        |
|               |           Aggregate: groupBy=[[#hits.RegionID AS group_alias_0, #hits.UserID AS alias1]], aggr=[[]]                                                           |
|               |             TableScan: hits projection=[RegionID, UserID]                                                                                                     |
| physical_plan | GlobalLimitExec: skip=0, fetch=10                                                                                                                             |
|               |   SortExec: [u@1 DESC]                                                                                                                                        |
|               |     CoalescePartitionsExec                                                                                                                                    |
|               |       ProjectionExec: expr=[RegionID@0 as RegionID, COUNT(DISTINCT hits.UserID)@1 as u]                                                                       |
|               |         ProjectionExec: expr=[group_alias_0@0 as RegionID, COUNT(alias1)@1 as COUNT(DISTINCT hits.UserID)]                                                    |
|               |           AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]                                                  |
|               |             CoalesceBatchesExec: target_batch_size=4096                                                                                                       |
|               |               RepartitionExec: partitioning=Hash([Column { name: "group_alias_0", index: 0 }], 16)                                                            |
|               |                 AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]                                                     |
|               |                   AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]                                   |
|               |                     CoalesceBatchesExec: target_batch_size=4096                                                                                               |
|               |                       RepartitionExec: partitioning=Hash([Column { name: "group_alias_0", index: 0 }, Column { name: "alias1", index: 1 }], 16)               |
|               |                         AggregateExec: mode=Partial, gby=[RegionID@0 as group_alias_0, UserID@1 as alias1], aggr=[]                                           |
|               |                           RepartitionExec: partitioning=RoundRobinBatch(16)                                                                                   |
|               |                             ParquetExec: limit=None, partitions=[home/danielheres/Code/gdd/ClickBench/datafusion/hits.parquet], projection=[RegionID, UserID] |
|               |                                                                                                                                                               |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 rows in set. Query took 0.015 seconds.

Describe the solution you'd like Implement this feature.

It basically means pushing the limit information down to SortPreservingMergeStream and keeping only the top N elements in the min_heap datastructure.

Describe alternatives you've considered n/a

Additional context Add any other context or screenshots about the feature request here.

Dandandan avatar Sep 16 '22 14:09 Dandandan

Min heap is a reasonable data structure to solve the issue.

xudong963 avatar Sep 16 '22 14:09 xudong963

The TopK operator can also be parallelized the same way as a multi-stage grouping

Like run TopK on each of the input DataFusion partitions, Union them all together, and then do a TopK on the final plan

alamb avatar Sep 16 '22 18:09 alamb

I think there are two items here

  • parallelizing order by followed by limit n:
  • implement topk operator

Currently a plan looks like the following

| physical_plan | GlobalLimitExec: skip=0, fetch=10                                                                                                                                                                                                                            |
|               |   SortExec: [COUNT(UInt8(1))@3 DESC]                                                                                                                                                                                                                         |

I think we should be translating it to:

| physical_plan | GlobalLimitExec: skip=0, fetch=10                                                                                                                                                                                                                            |
|               |   SortExec: [COUNT(UInt8(1))@3 DESC]                                                                                                                                                                                                                         |
|               |     CoalescePartitionsExec                                                                                                                                                                                                                                   |

Already with the current implementation we should be able to rewrite it to:

| physical_plan | GlobalLimitExec: skip=0, fetch=10                                                                                                                                                                                                                            |
|               |   SortPreservingMergeExec                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |      LocalLimitExec: skip=0, fetch=10                                                                                                                                                                                                                                   |
|               |        SortExec: [COUNT(UInt8(1))@3 DESC]

Dandandan avatar Sep 18 '22 11:09 Dandandan

Already with the current implementation we should be able to rewrite it to:

I agree this would help performance. However, it will become irrelevant if/when we implement a real TopK operator

alamb avatar Sep 18 '22 11:09 alamb

Already with the current implementation we should be able to rewrite it to:

I agree this would help performance. However, it will become irrelevant if/when we implement a real TopK operator

I imagine a similar optimization is still useful with a TopK operator. The example TopK operator in the tests still works only a single partition and can't run in parallel. I think you'll still need a "nested" TopK operator similar to the above plan to achieve the parallelization. So at the moment there is a TopK operator, we can change it to utilize this.

Of course the TopK operator will use way less memory - but in terms of performance for simple queries that can be executed in memory, the rewrite should probably already have a big benefit.

Dandandan avatar Sep 18 '22 12:09 Dandandan

@Dandandan -- I had not thought of the "parallel sorts with limit" as a good intermediate state -- you are exactly right and that makes perfect sense

alamb avatar Sep 18 '22 14:09 alamb

After playing with it a bit more, I think this issue boils down to pushing a limit to SortPreservingMergeStream and using it there to only keep top N items instead of keeping the full sorted output. The SortPreservingMergeStream already uses a heap data structure mentioned by @xudong963

Dandandan avatar Sep 19 '22 09:09 Dandandan

So it seems we mostly have the benefits of a TopK operator now by pushing down the limit to individual operations.

There are a couple of followups possible (will create some tickets for them and close this one):

  • Avoiding spilling to disk by reinserting the to-spill data to a memory buffer
  • Use limit in SortPreserveMergeExec
  • Keeping memory under control by merging buffered batches once in a while

Dandandan avatar Sep 21 '22 17:09 Dandandan

There are a couple of followups possible (will create some tickets for them and close this one): Use limit in SortPreserveMergeExec

is there an issue created for this follow-up? I didn't find it, so I create https://github.com/apache/arrow-datafusion/issues/6000. Feel free to close it if duplicated.

Also, feel free to correct me if my understanding and expectation in #6000 are wrong.

jychen7 avatar Apr 14 '23 03:04 jychen7

I believe this is completed by the work and we should track any upcoming issues somewhere else.

Dandandan avatar Jul 11 '23 17:07 Dandandan