ScanQuery supports multi column orderBy queries
Fixes #12958.
Description
- Add normal column sorting
- Add Inline DataSource Sorting
- Fix #13152 inline empty data sorting
- Sorter interface extraction for future improvement
- By default, the data in the segment is sorted by
QueueBasedMultiColumnSorter,Other sorters can be used through context parameters - Merge results using
QueueBasedMultiColumnSorter - Reference Sort style of
__time, addingpriority queue strategyandn-way merge strategy
Key changed/added classes in this PR
-
OrderByQueryRunner -
OrderBySequence -
QueueBasedMultiColumnSorter -
TreeMultisetBasedOrderByQueryRunner -
TreeMultisetBasedSorterSequence -
TreeMultisetBasedMulticolumnSorter -
ScanQueryRunnerFactory -
MultiColumnSorter -
QueueBasedMultiColumnSorter -
ScanQueryQueryToolChest
This PR has:
- [x] been self-reviewed.
- [x] using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
- [ ] added documentation for new or modified features or behaviors.
- [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
- [ ] added or updated version, license, or notice information in licenses.yaml
- [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
- [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
- [ ] added integration tests.
- [x] been tested in a test Druid cluster.
This pull request introduces 1 alert and fixes 1 when merging 5efc2d1fa8ef362db00f45f7dfaa6f9601a8d859 into ebfe1c0c90d86e4d188617fe840dafb2c9b7e5b0 - view on LGTM.com
new alerts:
- 1 for Dereferenced variable may be null
fixed alerts:
- 1 for Dereferenced variable may be null
This pull request fixes 1 alert when merging 858a0b6190d27799306e859a394ec958f8bbef67 into ebfe1c0c90d86e4d188617fe840dafb2c9b7e5b0 - view on LGTM.com
fixed alerts:
- 1 for Dereferenced variable may be null
@599166320, thanks for the update! I'll take another look soon.
I noticed that this is a new PR that replaces the existing, closed one. As it turns out, you can just reuse the same PR by pushing (or force-pushing) new commits. Reusing the same PR allows all comments to be in one place and preserves tags and other status. If the PR is closed, you can reopen it and be back in business. Let's use this PR from now on for this project. I've copied tags from your prior PR to this one, and included a link to the prior PR so we can see our earlier comments.
Please reach out if you need help: doing this PR stuff can be tricky the first time you do it.
Note to reviewers: please see prior history in PR #13031.
@599166320, thanks for your explanations. Is it fair to state these as the requirements you want to satisfy?
- Allow sorting of columns other than the pre-defined sort order.
- Sorting requires materializing the sort keys. Materialize only the sort keys: once the data is sorted, use the row IDs (rids) within the sort keys to fetch the data. This approach minimizes the memory footprint of the sort as it does not buffer the non-key columns.
- Optimize a limit operation: sort the entire set of rows that match the filter criteria, but fetch on-key columns only for those rows within the limit.
These are all great goals! The goal of minimizing the memory footprint is a worth one.
The above approach is similar to how an RDBMS uses a B-Tree index to read off index keys, then uses the rid within the key to fetch matching rows. (RDBMS systems usually add another layer of optimization. If the query requests only the sort keys, then skip the step to read the underlying rows. This is called an "index-only query" by some of us; other names also exist.)
I previously commented on the use of a priority queue to do sorting. I was concerned about the cost of sorting n rows using a priority queue. A quicksort runs in O(n log n) time.
According to the docs, the insert time for MinMaxPriorityQueue is O(log n). Since we insert n elements total, the total cost is O(n log n), the same as for quicksort. Given this, the difference in performance will be in the constant overhead of the two methods, plus any memory overhead differences.
These questions can only be worked out by doing performance testing. In the interests of moving ahead, the priority queue implementation is fine for now. According to the docs, when used with a limit, the cost of removing the last element (to allow a new, lower-value element) is constant time, so that should be fine.
@599166320, thanks for the update! I'll take another look soon.
I noticed that this is a new PR that replaces the existing, closed one. As it turns out, you can just reuse the same PR by pushing (or force-pushing) new commits. Reusing the same PR allows all comments to be in one place and preserves tags and other status. If the PR is closed, you can reopen it and be back in business. Let's use this PR from now on for this project. I've copied tags from your prior PR to this one, and included a link to the prior PR so we can see our earlier comments.
Please reach out if you need help: doing this PR stuff can be tricky the first time you do it.
@paul-rogers Thanks for your tips and help
This pull request fixes 1 alert when merging 5b1a757032da3897a3435fd01a8f985ed722a206 into 4bfae1deee75be688ee5b428a13b32e9f24ead4f - view on LGTM.com
fixed alerts:
- 1 for Dereferenced variable may be null
@paul-rogers Recently, I often see an integration-tests error in Travis. It seems to me that it has nothing to do with this PR. It may be caused by instability of other external resources.
@599166320, one more thought on your PR. As I work on mine, I sometimes realize that more work needs to be done. Rather than close the PR (and reopen it, or open a new one), I just mark the PR as a draft. This tells reviewers to pause until you mark the PR as ready for review again.
I apologize for those IT failures. The team has been trying to fix these flaky tests as they waste many hours of time. There are two things you can do. First, if you see a flaky test, please create or update an issue. There are a bunch of issues already open, so look for those first. (Search for "Flaky".) Else, create a new one. Take a look at a few of the existing issues to see what info to provide.
Second, you can ask a committer to rerun those particular tests. Sometimes we get lucky and the second run passes. This is a crazy way to run a railroad, but it is the best we have until we fix the particular test.
@599166320, I took an in-depth look at the code, including downloading your branch and stepping through the logic. The good news is that your unit test worked the first time, allowing me to see your code in action. Thanks!
Let's start by clarifying the approach that this PR takes. We discussed various approaches. I think there was some confusion (probably on my part) about the goal.
On the earlier PR, I thought the code was doing this:
- Use the cursor to materialize all rows into a priority queue, which sorts them using multiple columns.
- Deliver n rows of output, where n is the limit applied to the query.
Your comments suggested you had a fancier (more efficient) approach:
- Use the cursor to materialize just the sort keys into a priority queue.
- Use the priority queue to sort the keys. For each key set, preserve the row ID (rid) of the underlying data row.
- Make a second pass to read the full row (including non-key fields) in the sort order, using the rid to locate the underlying cursor row.
In doing a detailed review, I found:
- The cursor actually has no way to do the "fancy" approach: cursors are sequential; they don't provide random access.
- The implementation actually materializes rows, sorts them, and limits them, just as I originally assumed.
Is this an accurate assessment? If so, I think we can simplify the PR down to a few of your core improvements.
@599166320, I took an in-depth look at the code, including downloading your branch and stepping through the logic. The good news is that your unit test worked the first time, allowing me to see your code in action. Thanks!
Let's start by clarifying the approach that this PR takes. We discussed various approaches. I think there was some confusion (probably on my part) about the goal.
On the earlier PR, I thought the code was doing this:
- Use the cursor to materialize all rows into a priority queue, which sorts them using multiple columns.
- Deliver n rows of output, where n is the limit applied to the query.
Your comments suggested you had a fancier (more efficient) approach:
- Use the cursor to materialize just the sort keys into a priority queue.
- Use the priority queue to sort the keys. For each key set, preserve the row ID (rid) of the underlying data row.
- Make a second pass to read the full row (including non-key fields) in the sort order, using the rid to locate the underlying cursor row.
In doing a detailed review, I found:
- The cursor actually has no way to do the "fancy" approach: cursors are sequential; they don't provide random access.
- The implementation actually materializes rows, sorts them, and limits them, just as I originally assumed.
Is this an accurate assessment? If so, I think we can simplify the PR down to a few of your core improvements.
In the past, I remember that the Cursor interface of Druid has an advance(int offset), which allows skip to the row of the specified offset. Is this a random access? However, the new version of Druid removes the advance(int offset).
The reasons why I used the factory approach are as follows:
I think druid implements column storage, and each column is independent. In the process of query, if you can skip unnecessary columns, skip unnecessary data reading, and deserialize at each stage, you can reduce the consumption of memory, cpu, and other resources, thus speeding up the query.
Back to the specific use case of order by limit:
If there is a datasource with a, b, c, d, e, f, g or more columns, I will execute a query as follows:
select * from datasource order by a limit 100
When executing this sql, you can first load column a into memory, use cursor to traverse the corresponding BaseObjectColumnValueSelector of column a, sorting, get the top100 and the corresponding offset, and then construct a Map<Int, List<Comparable>>offsetOrderByColumnValuesMap
Then construct the corresponding BaseObjectColumnValueSelector according to a, b, c, d, e, f, g, and use cursor to traverse the corresponding BaseObjectColumnValueSelector of a, b, c, d, e, f, g columns.
In the process of traversing the BaseObjectColumnValueSelector, determine Whether the current offset already exists in the offsetOrderByColumnValuesMap. If so, read the data of row corresponding to the offset. Otherwise, skip reading.
The above is the sorting inside the segment. Then, there is a merge stage in Historical. The intermediate results of multiple top100 will be merged using priority queues (the bottom layer of n-way merge strategy is also merged based on priority queues) to generate the top100 of historical level.
Finally, the broker arrives. The broker uses the priority queue to calculate the final top100 and returns it to the client.
This is my past idea. According to your review, you think this idea is not feasible at present?Next,I should remove the implementation related to sorting in the segment (delete TopKOffsetSequence) and replace it with the implementation of ListBasedSorterSequence, TreeMultisetBasedSorterSequence, and TreeSetBasedSorterSequence.
@599166320, to follow-up a bit: see the summary above: the one that tries to summarize the approach. If that is correct, then the simplest solution is:
- In the
ScanQueryRunnerFactory, detect the sort case which the cursor cannot handle. (Is it only multi-column? Or, is it also that some columns don't provide a built-in sort order?) - If a sort is needed:
- Insert your sorting sequence (see below)
- Remove the sort from the scan query (or at least from the arguments passed down to the cursor)
- If a limit is needed:
- Provide a limit to the sort sequence
- Remove the limit from the scan query (or at least rom the limit passed down to the cursor)
- Build the cursor in the usual way (with sort and limit removed)
This gives three possible outcomes:
sort <-- cursor (sort without limit)
sort-with-limit <-- cursor (sort with limit)
cursor-with-limit (default case)
Finally, it turns out that there is an existing class that can help with the sorting : StableLimitingSorter. You provide the comparator and the limit, it does the rest. The key trick is that scan has two row types: you'll need a different comparator for each type.
@599166320, to follow-up a bit: see the summary above: the one that tries to summarize the approach. If that is correct, then the simplest solution is:
In the
ScanQueryRunnerFactory, detect the sort case which the cursor cannot handle. (Is it only multi-column? Or, is it also that some columns don't provide a built-in sort order?)If a sort is needed:
- Insert your sorting sequence (see below)
- Remove the sort from the scan query (or at least from the arguments passed down to the cursor)
If a limit is needed:
- Provide a limit to the sort sequence
- Remove the limit from the scan query (or at least rom the limit passed down to the cursor)
Build the cursor in the usual way (with sort and limit removed)
This gives three possible outcomes:
sort <-- cursor (sort without limit) sort-with-limit <-- cursor (sort with limit) cursor-with-limit (default case)Finally, it turns out that there is an existing class that can help with the sorting :
StableLimitingSorter. You provide the comparator and the limit, it does the rest. The key trick is that scan has two row types: you'll need a different comparator for each type.
Let me briefly summarize your simplest solution. When traversing the internal data of the segment, if it is an ordinary order by query, modify the order by parameter and limit parameter in the scanQuery object to prevent them from being transmitted to cursor. Let him just traverse the required data and sort it in the merge phase?
Let me briefly summarize your simplest solution. When traversing the internal data of the segment, if it is an ordinary order by query, modify the order by parameter and limit parameter in the scanQuery object to prevent them from being transmitted to cursor. Let him just traverse the required data and sort it in the merge phase?
That's about right. However, as you point out, there are two (actually three) "phases":
- The scan-then-sort phase. This requires the full result set, even with a limit, so we know the "top n" items after sorting.
- The Historical-side merges of the segment "scans" on so the Broker receives a single ordered list per historical.
- The Broker-side merge of the results from data nodes.
In my comments, I addressed only the scan-then-sort phase (to keep comments simple.)
The sort phase needs a sort algorithm: it makes more sense to buffer the rows in an array and sort the array, then it does to build a huge priority queue as we read rows. We can do performance tests, but my gut says a sort will be faster.
The two merge phases have a limited queue size: the number of incoming sequences. (The number of segments to merge in a historical, the number of historicals to merge on the Broker.) A priority queue is a good solution for the merge cases.
Fortunately, Druid has generic sorters and mergers: you just have to provide the comparator. (Double-check that the existing implementations do, in fact, fit your needs.)
The same comparator (structure) can be used in all three phases. The various comments in that area suggest that, once the code works, we should focus on optimizing the comparator since it is an O(n log n) inner loop. The optimization is not a design issue, just a fine-tuning issue once stuff basically works.
In the past, I remember that the Cursor interface of Druid has an
advance(int offset), which allows skip to the row of the specified offset. Is this a random access? However, the new version of Druid removes theadvance(int offset).
Your idea here is sound. If the cursor had a seek() operation, then you could further optimize the sort step as follows:
- Run the cursor once, reading just the sort keys. Create that combined
(key1, key2, ... rid)tuple. - Sort the key tuples.
- Run the cursor a second time.
seek(rid)to the position of each row. Then, read the full row into the final tuple to be sent downstream. (Extra credit for copying key values from the key tuple to the final tuple to avoid a segment read.) We only need fetch the first n items for aLIMIT nquery.
The above would be the "gold standard" solution. Alas, as you note, the Cursor class does not provide a seek() method. (Even a advance() method won't work, that is just a sequential scan skipping records. The result of the sort will be a semi-random list or rids.) So, the next-best alternative is that scan-and-sort process we discussed above.
So, perhaps as a "phase 1", we get the crude-but-effective mechanism to work (along with the merges). Just to be clear, the "crude-but-effective" method still reads a subset of columns; just those needed for the sort plus those projected by the query itself. (SQL doesn't usually allow sorting on columns not in the result set, so we shouldn't have to worry about discarding unwanted keys after the sort.)
Then, as a "phase 2", we go back and ask, is it possible to seek within segments? If so, can we modify the Cursor interface (or define a SeekableCursor) to allow random access?
It may be that segments can allow random access, but no other data sources. For example, one cannot seek into a Parquet file: the data encoding is such that a row group column has to be read sequentially. CSV and JSON are seekable in theory, but based on the byte offset, not the rid. And so on.
@paul-rogers I have done the following work in this commit:
-
For the sorting of ordinary columns, when traversing the segment, I prevent the scanquery object from passing the orderByLimit parameter to the cursor. (that is, the simplest solution mentioned above)
-
Improve and add more unit tests
-
Performance optimization mentioned above
The following points should be noted:
I didn't put the sorting of __time and the sorting of ordinary columns together, because __time is special. In Druid, __time is actually a special index. Unlike ordinary sorting, it must traverse all sorted data.
When you review again, see if there is anything else to improve?
This pull request fixes 1 alert when merging 6f1edf3fc68eaefb729ee1a9f00547633010e933 into 0edceead80e0770ed93230fe7e097fe72d731bba - view on LGTM.com
fixed alerts:
- 1 for Dereferenced variable may be null
This pull request fixes 1 alert when merging 151c36030da7ba3b6cc8fea71faf8e6dd3c5455b into 0edceead80e0770ed93230fe7e097fe72d731bba - view on LGTM.com
fixed alerts:
- 1 for Dereferenced variable may be null
This pull request fixes 1 alert when merging 559f80a0452ebcbf352b50b52d1e85927f696ffe into f89496ccacedc01449fb8ed4e45cf2345cb3ed34 - view on LGTM.com
fixed alerts:
- 1 for Dereferenced variable may be null
This pull request fixes 1 alert when merging 040d4b0443535c78e319923ef2ef60aef6b1b67c into f89496ccacedc01449fb8ed4e45cf2345cb3ed34 - view on LGTM.com
fixed alerts:
- 1 for Dereferenced variable may be null
@599166320, nice improvements! We can perhaps simplify the code even further.
Scan query is a bit confusing. The ScanQueryEngine produces batches of rows (the ScanResultValue), but to sort want individual rows. To mimic the current code, we have to put those rows back into batches for the rest of the query stack. Let's take these one-by-one.
First, let's remind ourselves that, if we sort (even with a limit) we need all the rows. So, we do want to call the thing that runs the engine. The question is, what is the format of the data and what do we do with it?
For better or worse, ScanQueryEngine takes the rows from the cursor and wraps them in ScanResultValue. Worse, for our purposes, ScanQueryEngine maps the rows to the two different scan query result formats. What we really want is single rows in a single, known format. Second best is a batch of rows in a single, known format. That format should be the "compact list" since we'll be storing them in memory.
So, our first task is to convince the ScanQueryEngine to give us what we want. We could modify the engine to return rows. But, that code is a bit complex, so we might want to use what we have, at least for now.
You are already rewriting the query to strip off the sort and limit when we do your custom sort. We can also force the format to RESULT_FORMAT_COMPACTED_LIST. This will give us, returned from ScanQueryEngine.process a sequence of ScanQueryValues.
But, we want rows. So, the next step is to unpack the ScanQueryValue objects into individual rows, and store those in an array, ready for sorting. Your code does this (though see comments.)
Then, we an apply the sort you've developed. Here, we can know that we have rows in compact list form, so each sort key can be translated to a position so we're comparing row1[posn], row2[posn]. That's about as fast as it can get.
Now we've got a sorted array. But, we want it potentially limited. So, our effective row count is min(rowCount, limit).
We now have a (perhaps limited) sorted list. The final step is to recreate the ScanResultValue objects, potentially converting to the inefficient RESULT_FORMAT_LIST if configured. The size of each of the ScanResultValue objects is, I believe, controlled by the batch value in the query.
Perhaps all of this can be handled by a single class based on SimpleSequence().
-
make()reads all the rows, creates the array, sorts it, and returns our "batching iterator". - The "batching iterator"
next()provides the nextbatchSizerows converted to aScanResultValue, stopping when we hit the effective row count computed above.
With that, ScanQueryRunnerFactory.ScanQueryRunner:
- Checks if a custom sort is needed using the function you've provided.
- If not, just returns the sequence from
ScanQueryEngine.processas today. - Else
- Modifies the query as we discussed.
- Calls
ScanQueryEngine.process - Hands the returned sequence to the sorter sequence described above
- Returns the sorter sequence
All this said, there is a bit of code that is pretty close already: `ScanQueryRunnerFactory.stableLimitingSort(). That methods returns a sequence which unpacks events, and sorts them. It is not quite what we want because it uses a priority queue (awkward for large numbers of items, great for merging) and returns rows one-by-one (we want a batch.) Still, it provides some good examples. (You've probably already looked at it since your implementation is similar.)
This pull request fixes 1 alert when merging 29a40420cc4381ecfc85f7f331a0d78b22d82a96 into 573e12c75fc0465d38ef37cae092125882f27eeb - view on LGTM.com
fixed alerts:
- 1 for Dereferenced variable may be null
@599166320, you've taken on a helpful improvement in a complex area. Would be great if we could be in the same room and use a whiteboard to achieve a common understanding of how Druid works in this area, and the design of your improvement. The next best thing is for me to state the assumptions which I have, so we can see if we have that shared understanding.
We need sorting to fit into the existing scan query structure. Here's my understanding of the gist of a scan query "logical plan", from the root (client) down to the leaf (scan):
1. Project scan rows into SQL rows.
2. Unbatch `ScanResultValue` batches into rows.
3. Broker: merge results from h historicals into a single sequence
4. Historical: merge results from s segments into a single sequence
5. Segment: merge results from c cursors into a single sequence
6. Cursor: read r rows into b batches each of size < ~20K rows, so that b = ceil(r / 20K).
If our query has no sort order, the results are unordered. In SQL, this means that the order is non-deterministic. So, the easy way to do the above merges is just a first-in-first out concatenation.
In Druid, segments are always sorted by time. So, if our query is ordered by `time, then the logical plan would be:
3. Broker: ordered merge of results from h historicals: merging the rows from `ScanResultValue` batches to create new batches
4. Historical: as above, for the s segments
5. Segment: concatenate the results from the c cursor since, I believe, the cursors are already time-ordered
6. Cursor: read r rows which are time ordered
One more thing to note here is that Druid allows sorting of segments by more than just __time. We can order (I believe) a segment by, say, (__time, a, b, c). In this case, the cursor should be able to sort by any prefix of the segment sort order. That is, it should accept an ordering of (__time), (__time, a), (__time, a, b) or (__time, a, b, c). (I have to check the code to verify). If this is true, then the function in this PR that checks if we need an "extra" sort has to take this information into consideration, which means the decision has to occur segment-by-segment since each segment may have a different sort order.
To get even more into the weeds, the cursor should also be able to order by the descending version of any prefix, such as (__time DESC, a DESC, b DESC). The cursor cannot, however, order by a mixture of ASC/DESC that does not match the segment sort order. So, our example segment cannot return rows in an order of, say, (__time, a DESC). We would need the extra sort in this case. (Again, I need to verify: perhaps Druid has a trick, but most systems don't.)
Now we can check our understanding of how your feature fits into the stack. There are two parts: sorts and merges. Let's tackle the sort first. There are multiple ways to handle the custom sort. One (undesireable) way is to let the Broker do the work:
3. Broker: unbatch results from h historicals into a single array(or list), sort it, and return the sorted results as a sequence
4. Historical: concat results from s segments into a single sequence
5. Segment: concat results from c cursors into a single sequence
6. Cursor: unordered read r rows into b batches
This works, but it puts unwanted amounts of load (in memory and compute) on the Broker. Better to distribute the sort. My earlier note suggested doing the sort at the segment level -- in part because that is easiest to explain:
3. Broker: ordered merge of results from h historicals: merging the rows from `ScanResultValue` batches to create new batches
4. Historical: as above, for the s segments
5. Segment: unbatch results from c cursors into a single array(or list), sort it, and return the sorted results as a sequence
6. Cursor: unordered read r rows into b batches
The advantage of the above is that if steps 5 and 6 run in a single thread, we do one big sort rather than a bunch of small sorts and a big merge. My hunch is that the single big sort would be faster. Of course, if we did step 6 in separate threads for each cursor, then we'd want to get maximum parallelism and so we'd want to use the approach which I think you prefer:
4. Historical: Ordered merge of s segments into a single sequence
5. Segment: Ordered merge of c cursors into a single sequence
5.5. Ordered merge of b batches into a single sequence
6. Cursor: unordered read r rows into b batches, sort each batch
There is one more variation possible:
5. Segment: Ordered merge of c cursors into a single sequence
5.5. Combine all r rows into a single list & sort
6. Cursor: unordered read r rows into b batches
All of these variations work. There is just a trade-off of the cost of the sort and the cost of the merge. The memory footprint is the same: all rows need to be in memory somewhere in order to be sorted. They are all in one big list (the Broker approach) or in multiple smaller lists (the other approaches.)
Editorial aside: this kind of detailed cost-tradeoff and query planning is best done by software, such as the Calcite planner. Us humans find the above mind-numbingly complex. As a result, the output of a human-generated query plan is often some combination of messy, buggy and sub-optimal performance. This is one reason we're discussing moving to a more traditional operator-and-planner based approach: let the computer do the boring stuff.
The other thing to discuss is the ordered merge steps. As it turns out, the need to do the ordered merge is independent of how we do the sort. It doesn't matter if the cursor did the sort for us, or if we added a custom sort. In both cases, we have to generate a merge comparator that has the sort keys, with ASC/DESC sort sense, in the proper order.
Merging requires two implementations: one for the "list" (i.e. list of map) format, the other for the "compact list" (i.e. list of array) format. We can assume that the compact-list rows for any one segment have the same set of columns, and that map-based rows have the same keys. That is, the set of column names will be whatever the query requests. The order of columns in the compact-list form should (we should check) be driven by the order that columns appear in the query.
But, we must anticipate that the types of columns from different segments may differ. Druid allows column c to be a long in one segment, a string in another. I believe Druid has some rules for reconciling these type conflicts. This means that the comparator has to know how to compare 10 and "foo". There may already be implementations for this in the existing comparators: we should check.
One more thing to note here is that Druid allows sorting of segments by more than just
__time. We can order (I believe) a segment by, say,(__time, a, b, c). In this case, the cursor should be able to sort by any prefix of the segment sort order. That is, it should accept an ordering of(__time),(__time, a),(__time, a, b)or(__time, a, b, c). (I have to check the code to verify). If this is true, then the function in this PR that checks if we need an "extra" sort has to take this information into consideration, which means the decision has to occur segment-by-segment since each segment may have a different sort order.To get even more into the weeds, the cursor should also be able to order by the descending version of any prefix, such as
(__time DESC, a DESC, b DESC). The cursor cannot, however, order by a mixture of ASC/DESC that does not match the segment sort order. So, our example segment cannot return rows in an order of, say,(__time, a DESC). We would need the extra sort in this case. (Again, I need to verify: perhaps Druid has a trick, but most systems don't.)
The query with __time as the prefix and the sorting direction is the same as __time, as shown below:
order by __time,a,b,c
order by __time desc,a desc,b desc
In the above two cases, I think we can consider taking a special path. The special path here is the Segment by segment decision you mentioned above
At present, I think of two ways to improve:
-
It is up to the user to decide whether to run a special path by passing parameters in the query context
-
According to the ingestion specs
dimensionsSpec, letdruidautomatically decide to take a special path.
For example:
dimensionsSpec{dimensions:[a,b,c,d,e,f,....]}
If the user's ingestion specs is configured as above, does druid sort the ingested data? If sorting already exists, the following queries can be run on special paths:
order by __time,a,b,c
order by __time desc,a desc,b desc
The other thing to discuss is the ordered merge steps. As it turns out, the need to do the ordered merge is independent of how we do the sort. It doesn't matter if the cursor did the sort for us, or if we added a custom sort. In both cases, we have to generate a merge comparator that has the sort keys, with ASC/DESC sort sense, in the proper order.
Merging requires two implementations: one for the "list" (i.e. list of map) format, the other for the "compact list" (i.e. list of array) format. We can assume that the compact-list rows for any one segment have the same set of columns, and that map-based rows have the same keys. That is, the set of column names will be whatever the query requests. The order of columns in the compact-list form should (we should check) be driven by the order that columns appear in the query.
But, we must anticipate that the types of columns from different segments may differ. Druid allows column
cto be alongin one segment, astringin another. I believe Druid has some rules for reconciling these type conflicts. This means that the comparator has to know how to compare10and"foo". There may already be implementations for this in the existing comparators: we should check.
Different segments do have different data types in the same column. Let me see how to solve this problem.
@paul-rogers
Let me briefly summarize the problems you mentioned above. Based on this PR, I have two things to do next:
-
Code implementation of special path(
Segment by segment decision) -
Solve the problem of different data types in the same column of different segments
Another thing: use Calcite (operator and planner based) to improve the query engine, which is what you plan to do later.