pinot
pinot copied to clipboard
optimize `order by sorted ASC, unsorted` and `order by DESC` cases
Below the line is the original PR comment. After the PR was open, some things have changed. Specially:
- The DESC case is also optimized
- The
partial-order-byoption, which enables or disables the optimization, is enabled by default
This is a quite large PR that should be marked as a Draft. The improvements included here are important, but I also have to change several APIs and there may be better ways to do it. In fact it is not mandatory to change these APIs, but most of them try to add order in a bunch of casting and assumptions that were I find very confusing the abusive usage of casting and sorting assumptions that were scattered in the code. I think they are now clearer, but I understand that it is a subjective issue. The code is not properly tested and that is something I would like to solve before merging this.
The current order-by code have some inefficiencies that may not be trivial to fix, but will significantly increase the performance. I discovered that by reading #8837, but this PR doesn't fix the performance problem detected in that issue. Instead, it is focused on queries like:
SELECT whatever FROM table WHERE whatever ORDER BY <some_sorted_column> ASC, <some_unsorted_column_or_expression> LIMIT <some_value> OFFSET <some_other_value>
As proved by the included benchmark, the performance gain is huge:
Benchmark (_numRows) (_partialOrderBy) (_primaryRepetitions) (_query) (_scenario) Mode Cnt Score Error Units
BenchmarkOrderByQueries.query 1500000 true 1 SELECT SORTED_COL FROM MyTable ORDER BY SORTED_COL, LOW_CARDINALITY_STRING_COL LIMIT 1052 EXP(0.5) avgt 5 0.992 ± 0.041 ms/op
BenchmarkOrderByQueries.query 1500000 true 1000 SELECT SORTED_COL FROM MyTable ORDER BY SORTED_COL, LOW_CARDINALITY_STRING_COL LIMIT 1052 EXP(0.5) avgt 5 1.588 ± 0.128 ms/op
BenchmarkOrderByQueries.query 1500000 false 1 SELECT SORTED_COL FROM MyTable ORDER BY SORTED_COL, LOW_CARDINALITY_STRING_COL LIMIT 1052 EXP(0.5) avgt 5 62.553 ± 0.606 ms/op
BenchmarkOrderByQueries.query 1500000 false 1000 SELECT SORTED_COL FROM MyTable ORDER BY SORTED_COL, LOW_CARDINALITY_STRING_COL LIMIT 1052 EXP(0.5) avgt 5 59.222 ± 5.772 ms/op
Before this PR, this kind of queries were executed in O(SlogL) where S is the number of rows in the segment and L the effective limit (calculated as min(max_limit, offeset + limit)). That is a waste which is more evident in degraded scenarios like having a segment that only contains a rows that are all distinct in the sorting column. In that case, we the query above should be executed in O(L), as we only need to read the first L rows that match the predicate and return them because they would be already sorted. Of course that is not valid in the general case, but we can use the fact data is already partially sorted by <some_sorted_column>, so we don't need to read the whole segment.
What this PR does is to determine how many already sorted expressions are a prefix in the list of order-by expressions. In other words: this optimization is only applied if there is a number 0 < i < orderByExpr.length : orderByExpr[i] is sorted. Where is sorted means that the expression is either a constant (which is quite useless, but still) or if it is a column that is sorted and the order is ASC. Pinot already has an optimization when that i is exactly equal to orderByExpr.length. This PR adds another one when 0 < i < orderByExpr.length.
What this PR does is to partition the segment into P mathematical partitions where two rows are in the same partition if and only if they the value of their prefix columns is equal.
As we know that these prefix of expressions is sorted, we know that the rows of each partition are going to be consecutives. These partitions will be also sorted. Which means that partitions that contains smaller elements (compared with the prefix of sorted order by expressions) are going to be found before than any other partition that is higher.
Therefore, we can add the results to an unbound List and stop when:
- Either there are no more rows that matched the predicate
- Or the List has more than L elements AND there are no more elements in the partition.
As all elements in the same partition are consecutives, we can easily know that a partition has finished when a value that doesn't belong to that partition is found.
Once we stop, we just need to sort (using all order-by expressions) the List (whose size will lower or equal to L + size of the last included partition) and return the first L elements there.
There is another interesting property: as elements of different partitions are already sorted, we don't need to sort the whole List once we finish. We can simply sort each partition when a new one is found, reducing the cost from O(SlogL) to O(pPlog(P)) where P is the size of the bigger partition included in the result and p the number of partitions included.
This optimization is ideal if the number of partitions is very high (close to the number of rows), as the cost will tend to be O(L). If the number of partitions is very small, the cost is tend to be O(Slog(L)), which is the same we had before.
Right now the optimization is disabled by default and have to be activated with a new option called partial-order-by, which is used in the benchmark to compare results.
Codecov Report
Merging #8979 (000d4d5) into master (0a442b9) will decrease coverage by
2.69%. The diff coverage is83.23%.
@@ Coverage Diff @@
## master #8979 +/- ##
============================================
- Coverage 70.03% 67.34% -2.70%
+ Complexity 5288 5108 -180
============================================
Files 1937 1443 -494
Lines 103498 75570 -27928
Branches 15715 12053 -3662
============================================
- Hits 72487 50892 -21595
+ Misses 25907 21013 -4894
+ Partials 5104 3665 -1439
| Flag | Coverage Δ | |
|---|---|---|
| integration1 | ? |
|
| integration2 | ? |
|
| unittests1 | 67.34% <83.23%> (-0.07%) |
:arrow_down: |
| unittests2 | ? |
Flags with carried forward coverage won't be shown. Click here to find out more.
| Impacted Files | Coverage Δ | |
|---|---|---|
| ...mmon/request/context/OrderByExpressionContext.java | 58.82% <0.00%> (-3.68%) |
:arrow_down: |
| ...operator/AcquireReleaseColumnsSegmentOperator.java | 0.00% <ø> (ø) |
|
| ...he/pinot/core/operator/query/DistinctOperator.java | 96.15% <ø> (ø) |
|
| ...not/core/operator/query/SelectionOnlyOperator.java | 98.18% <ø> (ø) |
|
| ...not/segment/spi/datasource/DataSourceMetadata.java | 100.00% <ø> (ø) |
|
| ...va/org/apache/pinot/spi/utils/CommonConstants.java | 24.00% <ø> (ø) |
|
| ...nMaxValueBasedSelectionOrderByCombineOperator.java | 60.99% <72.72%> (-12.73%) |
:arrow_down: |
| ...operator/query/LinearSelectionOrderByOperator.java | 76.15% <76.15%> (ø) |
|
| ...uery/SelectionPartiallyOrderedByDescOperation.java | 83.33% <83.33%> (ø) |
|
| .../org/apache/pinot/core/plan/SelectionPlanNode.java | 90.47% <88.88%> (-9.53%) |
:arrow_down: |
| ... and 851 more |
:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more
I've added some changes to also optimize the order by sorted_col DESC (, more columns)* case. The results are even better in this case because the priority queue is quite less effective in the desc case, as it has each of the N insertions has to move the whole queue.
The option _partialOrderBy is not true by default, which helped me to find some errors in the implementation by running the normal tests with the new version.
The benchmarks now are:
sortedAsc:
SELECT SORTED_COL
FROM MyTable
ORDER BY SORTED_COL ASC
LIMIT 1052
option(partial-order-by= + _partialOrderBy + )
sortedAscPartially
SELECT SORTED_COL
FROM MyTable
ORDER BY SORTED_COL ASC, LOW_CARDINALITY_STRING_COL
LIMIT 1052
option(partial-order-by= + _partialOrderBy + )
sortedDesc
SELECT SORTED_COL
FROM MyTable
ORDER BY SORTED_COL DESC
LIMIT 1052
option(partial-order-by= + _partialOrderBy + )
sortedDescPartially
SELECT SORTED_COL
FROM MyTable
ORDER BY SORTED_COL DESC, LOW_CARDINALITY_STRING_COL
LIMIT 1052
option(partial-order-by= + _partialOrderBy + )
Benchmark (_numRows) (_partialOrderBy) (_primaryRepetitions) (_scenario) Mode Cnt Score Error Units
BenchmarkOrderByQueries.sortedAsc 1500000 true 1 EXP(0.5) avgt 5 1.071 ± 0.085 ms/op
BenchmarkOrderByQueries.sortedAsc 1500000 true 1000 EXP(0.5) avgt 5 0.339 ± 0.125 ms/op
BenchmarkOrderByQueries.sortedAsc 1500000 false 1 EXP(0.5) avgt 5 37.054 ± 3.505 ms/op
BenchmarkOrderByQueries.sortedAsc 1500000 false 1000 EXP(0.5) avgt 5 31.674 ± 2.470 ms/op
BenchmarkOrderByQueries.sortedAscPartially 1500000 true 1 EXP(0.5) avgt 5 1.369 ± 0.336 ms/op
BenchmarkOrderByQueries.sortedAscPartially 1500000 true 1000 EXP(0.5) avgt 5 2.802 ± 0.161 ms/op
BenchmarkOrderByQueries.sortedAscPartially 1500000 false 1 EXP(0.5) avgt 5 81.858 ± 4.989 ms/op
BenchmarkOrderByQueries.sortedAscPartially 1500000 false 1000 EXP(0.5) avgt 5 78.131 ± 5.350 ms/op
BenchmarkOrderByQueries.sortedDesc 1500000 true 1 EXP(0.5) avgt 5 4.052 ± 2.156 ms/op
BenchmarkOrderByQueries.sortedDesc 1500000 true 1000 EXP(0.5) avgt 5 1.841 ± 0.382 ms/op
BenchmarkOrderByQueries.sortedDesc 1500000 false 1 EXP(0.5) avgt 5 371.618 ± 20.795 ms/op
BenchmarkOrderByQueries.sortedDesc 1500000 false 1000 EXP(0.5) avgt 5 275.750 ± 39.659 ms/op
BenchmarkOrderByQueries.sortedDescPartially 1500000 true 1 EXP(0.5) avgt 5 3.973 ± 3.464 ms/op
BenchmarkOrderByQueries.sortedDescPartially 1500000 true 1000 EXP(0.5) avgt 5 7.004 ± 0.825 ms/op
BenchmarkOrderByQueries.sortedDescPartially 1500000 false 1 EXP(0.5) avgt 5 396.643 ± 40.451 ms/op
BenchmarkOrderByQueries.sortedDescPartially 1500000 false 1000 EXP(0.5) avgt 5 584.684 ± 49.203 ms/op
I still need to add tests to verify the correctness, which is in what I'm going to be focused early next week.
PS: Just to be clear, this optimization does not actually improve sortedAsc. The code that is in master is similar to runningsortedAsc with _partialOrderBy = true. By changing the option to false we can now disable the optimization that is always applied in master in sortedAsc case, which is what the benchmark do. All the other benchmarks are actual improvements of this PR.
I've fixed one bug detected by the integration tests (it is an error to create a PriorityQueue with 0 elements) but I wasn't able to fix the second one because I don't actually know why does it time out.
I think I finally found why the test was failing. It was due to a cmp >= 0 assertion that should be cmp <= 0. The reason that made it very hard to find the problem is that BaseCombineOperator is not prepared to catch Errors, so the task at the server just failed without printing any log and as a result the broker didn't receive a response, so the error shown in the broker was that some server didn't answer.
Things to know about:
- Pinot may not log
AssertionErrors, so it is better to do not useassertexpressions - We may or may not want to log in case we receive an error. As I don't know what do we want to do in that case, I've just changed my code to throw an
ArgumentException.
I think I finally found why the test was failing. It was due to a
cmp >= 0assertion that should becmp <= 0. The reason that made it very hard to find the problem is thatBaseCombineOperatoris not prepared to catchErrors, so the task at the server just failed without printing any log and as a result the broker didn't receive a response, so the error shown in the broker was that some server didn't answer.Things to know about:
- Pinot may not log
AssertionErrors, so it is better to do not useassertexpressions- We may or may not want to log in case we receive an error. As I don't know what do we want to do in that case, I've just changed my code to throw an
ArgumentException.
+1. can we file an issue for fixing the AssertException not surfacing problem?
I think I finally found why the test was failing. It was due to a
cmp >= 0assertion that should becmp <= 0. The reason that made it very hard to find the problem is thatBaseCombineOperatoris not prepared to catchErrors, so the task at the server just failed without printing any log and as a result the broker didn't receive a response, so the error shown in the broker was that some server didn't answer.Things to know about:
- Pinot may not log
AssertionErrors, so it is better to do not useassertexpressions- We may or may not want to log in case we receive an error. As I don't know what do we want to do in that case, I've just changed my code to throw an
ArgumentException.
We use assert to ensure certain behavior, but also avoid the overhead of the extra check. In production environment, assert statement is ignored, which is the desired behavior. In performance critical path, we want to avoid the extra check if we know certain behavior is expected.
assert can be enabled by setting a JVM parameter: -ea
assert can be enabled by setting a JVM parameter: -ea
Yeah, that is the behavior of assert. The problem is: In several parts of Pinot there are try-catch sections that catch Exception and do something (logging, returning something special, etc). Usually it is a bad practice in Java to catch Throwable because most of the times the application can not survive to an Error. For example, if we are getting out of memory or if there is an assertion that fail.
But that usual good practice does not always apply. For example I found that by not catching OOM errors in the ingestion jobs we are in fact not failing when one of them is thrown and therefore we break the consistency. In this PR I found a similar problem: As our code is not catching the AssertionError that assert throws, the integration tests were neither failing appropriately neither logging the problem. Instead I've got a very random error and no logging, which makes it very difficult to understand where the problem was.
We have two options here: Either modify our codebase to try to catch all the Throwables in the root of the threads or do not use assertions. What we cannot do is to continue using assert when it is more problematic than useful.
For descending order, ideally we should implement an iterator which can read rows in reverse order
That was my first approach and for sure it would be better. But the cursor interface specifies that iteration must be done in ascending order. I tried to add a new reverse cursor that doesn't do that, but I wasn't sure about its implications. If there is some out code there that assumes all cursor iterations are going to be ascending, we may have problems. Therefore I prefer to initially add an implementation that is more conservative.
For the assert statement, personally I usually use it for documentation purpose to indicate that a condition should always be met based on the calling tree (e.g. a condition is already checked before calling a method), especially in performance critical path. If a condition might not be met when the input is illegal, we can use Preconditions instead.
For the descending order case, the final solution should be using the reverse iterator. I'd suggest modifying the current PR to optimize ascending order only, and then open a separate PR to add the reverse iterator and optimize the descending order case.
I applied several suggested changes, but there are a lot of conflicts with merge. I'm planning to solve them tomorrow and given that we decided to do not apply my change on IntermediateResultsBlock, I think it may be easier to apply the operator changes in a new branch (which may be this PR, but doing some rewrite of the history)
These are the results I'm having with the last version. It seems that the descending version is slower than in the first attempts. It may be due to the incorrect behaviors that were detected by @Jackie-Jiang, but it may also be some implementation inefficiencies. I'll will try to study that a bit
Benchmark (_numRows) (_primaryRepetitions) (_scenario) (_skipOptimization) Mode Cnt Score Error Units
BenchmarkOrderByQueries.sortedAsc 1500000 1 EXP(0.5) true avgt 5 36.699 ± 1.108 ms/op
BenchmarkOrderByQueries.sortedAsc 1500000 1 EXP(0.5) false avgt 5 1.349 ± 0.091 ms/op
BenchmarkOrderByQueries.sortedAsc 1500000 1000 EXP(0.5) true avgt 5 29.210 ± 1.966 ms/op
BenchmarkOrderByQueries.sortedAsc 1500000 1000 EXP(0.5) false avgt 5 0.421 ± 0.032 ms/op
BenchmarkOrderByQueries.sortedAscPartially 1500000 1 EXP(0.5) true avgt 5 85.217 ± 11.162 ms/op
BenchmarkOrderByQueries.sortedAscPartially 1500000 1 EXP(0.5) false avgt 5 1.460 ± 0.261 ms/op
BenchmarkOrderByQueries.sortedAscPartially 1500000 1000 EXP(0.5) true avgt 5 81.292 ± 25.354 ms/op
BenchmarkOrderByQueries.sortedAscPartially 1500000 1000 EXP(0.5) false avgt 5 2.086 ± 0.500 ms/op
BenchmarkOrderByQueries.sortedDesc 1500000 1 EXP(0.5) true avgt 5 352.122 ± 37.947 ms/op
BenchmarkOrderByQueries.sortedDesc 1500000 1 EXP(0.5) false avgt 5 14.572 ± 0.771 ms/op
BenchmarkOrderByQueries.sortedDesc 1500000 1000 EXP(0.5) true avgt 5 283.754 ± 12.982 ms/op
BenchmarkOrderByQueries.sortedDesc 1500000 1000 EXP(0.5) false avgt 5 9.702 ± 2.364 ms/op
BenchmarkOrderByQueries.sortedDescPartially 1500000 1 EXP(0.5) true avgt 5 443.211 ± 53.459 ms/op
BenchmarkOrderByQueries.sortedDescPartially 1500000 1 EXP(0.5) false avgt 5 71.102 ± 64.952 ms/op
BenchmarkOrderByQueries.sortedDescPartially 1500000 1000 EXP(0.5) true avgt 5 623.616 ± 45.834 ms/op
BenchmarkOrderByQueries.sortedDescPartially 1500000 1000 EXP(0.5) false avgt 5 92.627 ± 9.508 ms/op
Seems like a great optimization. Any plans of resuming this ?
Seems like a great optimization. Any plans of resuming this ?
If I remember correctly there are two things we would need to improve:
- Disable the optimization when null handling is active
- Decide whether we should remove the descending optimization or not. As suggested by @Jackie-Jiang, it would make more sense (and be more efficient) to have descending operators. The current solution in this PR runs faster than the previous code, but given the complexities of the code, it is difficult to be sure whether the solution is always correct or not.
Do you think it makes sense to keep the descending optimization or it would be better to just remove it and wait until we have descending iterators?
New results with the last improvements suggested by @Jackie-Jiang:
Benchmark (_limit) (_numRows) (_orderByAlgorithm) (_primaryRepetitions) Mode Cnt Score Error Units
BenchmarkOrderByDescQueries.sortedDesc 101 1500000 naive 1 avgt 5 212.764 ± 22.368 ms/op
BenchmarkOrderByDescQueries.sortedDesc 101 1500000 naive 1000 avgt 5 46.058 ± 2.962 ms/op
BenchmarkOrderByDescQueries.sortedDesc 101 1500000 naive 5000 avgt 5 33.271 ± 5.227 ms/op
BenchmarkOrderByDescQueries.sortedDesc 101 1500000 null 1 avgt 5 10.628 ± 1.358 ms/op
BenchmarkOrderByDescQueries.sortedDesc 101 1500000 null 1000 avgt 5 7.170 ± 0.565 ms/op
BenchmarkOrderByDescQueries.sortedDesc 101 1500000 null 5000 avgt 5 7.007 ± 0.536 ms/op
BenchmarkOrderByDescQueries.sortedDesc 10100 1500000 naive 1 avgt 5 502.738 ± 12.277 ms/op
BenchmarkOrderByDescQueries.sortedDesc 10100 1500000 naive 1000 avgt 5 461.401 ± 19.661 ms/op
BenchmarkOrderByDescQueries.sortedDesc 10100 1500000 naive 5000 avgt 5 381.431 ± 9.761 ms/op
BenchmarkOrderByDescQueries.sortedDesc 10100 1500000 null 1 avgt 5 50.491 ± 6.376 ms/op
BenchmarkOrderByDescQueries.sortedDesc 10100 1500000 null 1000 avgt 5 38.933 ± 5.553 ms/op
BenchmarkOrderByDescQueries.sortedDesc 10100 1500000 null 5000 avgt 5 33.065 ± 1.795 ms/op
BenchmarkOrderByDescQueries.sortedDesc 101000 1500000 naive 1 avgt 5 637.033 ± 11.965 ms/op
BenchmarkOrderByDescQueries.sortedDesc 101000 1500000 naive 1000 avgt 5 528.267 ± 22.312 ms/op
BenchmarkOrderByDescQueries.sortedDesc 101000 1500000 naive 5000 avgt 5 527.354 ± 45.383 ms/op
BenchmarkOrderByDescQueries.sortedDesc 101000 1500000 null 1 avgt 5 52.356 ± 3.886 ms/op
BenchmarkOrderByDescQueries.sortedDesc 101000 1500000 null 1000 avgt 5 47.692 ± 4.879 ms/op
BenchmarkOrderByDescQueries.sortedDesc 101000 1500000 null 5000 avgt 5 37.208 ± 12.339 ms/op
BenchmarkOrderByDescQueries.sortedDescPartially 101 1500000 naive 1 avgt 5 265.343 ± 16.922 ms/op
BenchmarkOrderByDescQueries.sortedDescPartially 101 1500000 naive 1000 avgt 5 184.912 ± 8.529 ms/op
BenchmarkOrderByDescQueries.sortedDescPartially 101 1500000 naive 5000 avgt 5 105.262 ± 5.114 ms/op
BenchmarkOrderByDescQueries.sortedDescPartially 101 1500000 null 1 avgt 5 53.949 ± 4.434 ms/op
BenchmarkOrderByDescQueries.sortedDescPartially 101 1500000 null 1000 avgt 5 89.993 ± 31.135 ms/op
BenchmarkOrderByDescQueries.sortedDescPartially 101 1500000 null 5000 avgt 5 81.264 ± 5.733 ms/op
BenchmarkOrderByDescQueries.sortedDescPartially 10100 1500000 naive 1 avgt 5 472.145 ± 16.579 ms/op
BenchmarkOrderByDescQueries.sortedDescPartially 10100 1500000 naive 1000 avgt 5 757.835 ± 51.859 ms/op
BenchmarkOrderByDescQueries.sortedDescPartially 10100 1500000 naive 5000 avgt 5 762.778 ± 15.560 ms/op
BenchmarkOrderByDescQueries.sortedDescPartially 10100 1500000 null 1 avgt 5 100.263 ± 18.075 ms/op
BenchmarkOrderByDescQueries.sortedDescPartially 10100 1500000 null 1000 avgt 5 104.300 ± 3.241 ms/op
BenchmarkOrderByDescQueries.sortedDescPartially 10100 1500000 null 5000 avgt 5 103.599 ± 9.995 ms/op
BenchmarkOrderByDescQueries.sortedDescPartially 101000 1500000 naive 1 avgt 5 638.340 ± 62.591 ms/op
BenchmarkOrderByDescQueries.sortedDescPartially 101000 1500000 naive 1000 avgt 5 941.473 ± 139.675 ms/op
BenchmarkOrderByDescQueries.sortedDescPartially 101000 1500000 naive 5000 avgt 5 1030.555 ± 12.376 ms/op
BenchmarkOrderByDescQueries.sortedDescPartially 101000 1500000 null 1 avgt 5 118.147 ± 8.753 ms/op
BenchmarkOrderByDescQueries.sortedDescPartially 101000 1500000 null 1000 avgt 5 113.684 ± 9.346 ms/op
BenchmarkOrderByDescQueries.sortedDescPartially 101000 1500000 null 5000 avgt 5 121.790 ± 7.841 ms/op
@Jackie-Jiang I'm going to start a two weeks vacations today. It would be great to merge this PR soon, so if you have time to invest here, feel free to modify whatever you consider in order to merge it.
I like most of the changes introduced by Jackie, and the ones I don't are mostly cosmetic and subjective, so I'm fine with that. :+1: