trino
trino copied to clipboard
Rewrite partial top-n node to LimitNode or LastNNode
Description
If Trino knows that each file has been sorted, it can rewrite the Partial TopN
to LimitNode
or LastNNode
(a new logical plan node in this PR), which will significantly improve query performance. Trino can use the sort_order_id
field in the Iceberg manifest entry to determine whether a data file has been sorted.
Backgrounds and Core Idea
For convenience of review, I need to describe the LastNOperator
(a new operator was introduced) in this PR and the working principle of the related code. First, as a recap, let's take a quick look at how Page was originally built from OrcReader
. I believe Trino gods are more familiar with the implementation details, but it can help understanding the LastNOperator
implementations later. Trino's OrcReader advances based on RowGroups then obtains the current RowGroup's input stream, and construct Pages based on this input stream. Therefore, at least during the Pages generation phase, a Page is not generated across RowGroups. In other words, a Page does not come from two different RowGroups. Additionally, the batch parameter MAX_BATCH_SIZE is set to 8192 in current codebase, so if the default RowGroup size is 10,000, at least one Page will be generated for each RowGroup.
In this PR, when OrcReader outputs a Page, it adds an additional column whose value is calculated by concatenating the existing variables stripeSize
and currentRowGroup
. The following code snippet is the logic for adding the last column to the Page. This code is part of a class called OrcRecordBackwardReader
. This class mainly iterates row groups in reverse order.
Page page = new Page(currentBatchSize, blocks);
long[] values = new long[page.getPositionCount()];
Arrays.fill(values, (long) stripeSize << 32 | currentRowGroup);
LongArrayBlock rowGroupId = new LongArrayBlock(page.getPositionCount(), Optional.empty(), values);
page = page.appendColumn(rowGroupId);
It should be noted that trino will do some additional optimization on the output Page. For example, when the
physical operator includes ScanFilterAndProjectOperator
or FilterAndProjectOperator
instead of TableScanOperator
. Trino optimizes the Page by merging or splitting and this PR also adapts to the both scenarios.
If the implementation of LastNOperator
reads from the beginning of the file and retains the data from the last few Pages at the end of the file, there may still be significant IO overhead due to processing a lot of irrelevant data. Therefore, the LastNOperator
expects the input page to be constructed from the last RowGroup of the current split and this depends on the behavior of OrcReader's output Page,so the PR introduces a class called OrcRecordBackwardReader
to differentiate it from the existing OrcRecordReader
.
Furthermore, the LastNOperator
implementation takes performance issues into account. Each Page entering the operator has a unique RowGroupId block to indicate from which row group the Page comes and the block is the last block of the Page. Unlike the LimitOperator
, which outputs any input Page downstream until remainingLimit is no longer greater than 0, the LastNOperator
accumulates the Pages. It does not output to the downstream operator until it accumulates N elements unless the total number of rows in the source data is less than N. Therefore, the LastNOperator
is a blocking operator, like the OrderByOperator
, and it can precisely calculates the N elements needed to reduce the memory of the current worker process and the network pressure caused by sending data to downstream operator.
Now, let's illustrate the general implementation approach of the LastNOperator
with an example.
Consider the following data written sequentially in a column. For convenience, I use one row to represent it, and assume the row group size is 3. This numbers will be divided into three row groups. Since the data is written sequentially, the data between row groups follows the order: data in row group 3 is smaller than that in row group 2, and data in row group 2 is smaller than that in row group 1. However, within each row group, the data is still sequential.
1,2,3 | 4,5,6 | 7,8,9
row group-3 [1,2,3]
row group-2 [4,5,6]
row group-1 [7,8,9]
The LastNOperator
maintains a corresponding queue for each row group, and the queue saves the pages generated by the row group. Also, the value of N is updated when encountering each new row group and the purpose is to calculate how many N is needed remaining in the new row group. For example, if we need the top 4 data. i.e. [9,8,7,6], with N being 4, then when row group-1 is processed done, N will be updated to 1, we can call it the remaining N is 1. At this point, only 1 more digit needs to be obtained from row group-2 is fine. Since in each row group, the data is still sequentialas so all pages in row group-2 must be read to find the page containing the number 6. Next, when reading row group-3, N is updated again, and when N decrease to 0, it indicates that no more data is needed. When the LastNOperator
finally outputs, it region the pages to reduce memory and network pressure, ensuring that the data sent downstream only contains [9,8,7,6], even if don't region page at getOutput
, it won't have much impact on memory and network because this part of the data is very small. In addition, the implementation of addInput
in LastNOperator
, for each current queue, the impact of memory on the worker process will also be fully considered.
After this version of the code if can be successfully merged, we will also plan to support multi-field sorting and the situation where only some part of files are sorted. Probably like below, but this PR does not have these functions.
Test
Testing was performed on real log queries to retrieve the latest 200 records, which is a very common business requirement. Two query scenarios were tested, one ascending and one descending. It is worth noting that the performance of these two test cases was almost identical. The brief test idea is as follows:
Write
The iceberg table sorting field is defined as ORDER BY _timestamp DESC NULLS LAST
, and the iceberg table is written using the Apache/Spark
engine.
Read
The sorting of the query was divided into two cases, one that is exactly the same as the writing order and one that is completely opposite.
ORDER BY _timestamp DESC NULLS LAST: It took 1.96 seconds, and the partial-top-n node was rewritten as LimitNode
, corresponding to LimitOperator
.
ORDER BY _timestamp ASC NULLS FIRST: It took 2.24 seconds, and the partial-top-n node was rewritten as LastNNode
, corresponding to the new operator added by the MR, called LastNOperator
.
Compared to the original Trino query, the query takes 36.45 seconds, and the test results show a performance improvement of 16 to 18 times.
SELECT
"cid",
"errno",
"host_deploy_env" "host.deploy_env",
"host_name" "host.name",
"host_region" "host.region",
"host_zone" "host.zone",
"log_file" "log.file",
"log_level" "log.level",
"log_msg" "log.msg",
"log_observed_time" "log.observed_time",
"log_offset" "log.offset",
"msg",
"pid",
"service_name" "service.name",
"tag",
"version",
"_consumer_hostname",
"_id",
"log_stuid" "log.stuid",
"upstream_code",
"upstream_addr",
"req_type",
"arg_trid",
"_timestamp" "timestamp"
FROM iceberg_log.xxx_yyy_zzz
WHERE ((("msg" LIKE '%client%') AND ("msg" IS NOT NULL)) AND (_timestamp > 1705341600000) AND (_timestamp <= 1705342800000) AND (log_date >= '20240116') AND (log_date <= '20240116') AND (log_hour >= '02') AND (log_hour <= '02'))
ORDER BY _timestamp ASC NULLS FIRST
OFFSET 0 ROWS
LIMIT 200
Partition statistics
- Size: 58.1GB
- Record Counts: 1,083,459,817
- Total File Counts: 236
Additional context and related issues
This is the work of the first part of this issue https://github.com/trinodb/trino/issues/18139
This mr still has some details that need to be dealt with, but the review can be started. @marton-bod @findepi @alexjo2144 If convenient, Would you mind to have a review on this MR ?
Have you looked at using the same approach as https://github.com/trinodb/trino/pull/6634 or https://github.com/trinodb/trino/pull/8171 to replace partial topN with limit ?
Hi, @raunaqmorarka I've given an initial review of the information available on the partial rewriting of TopN to Limit. It seems that my first step would be to acquire the ConnectorTableProperties
object within the getTableProperties
method, and during this process, I would need to construct a LocalProperty
object, which would hold the sorting information.
However, an issue arises as the getTableProperties
operation is also executed for basic queries such as 'select * from table limit x'. To discern whether an Iceberg table is sorted, the planFile
is required, which constitutes a heavy operation due to the necessity to traverse all records in the Avro file, particularly when there are a multitude of table data files. This appears to differentiate from other connectors, such as Phoenix or Hive.
Furthermore, if data is written by (for instance, ASC) contradicts the query direction (for instance, DESC), it seems that the current Trino version doesn't provide support. Do you have any suggestions? I would greatly appreciate your insights.
@zhangminglei Thanks for PR. Could you rebase? it also seems that iceberg tests are failing
@sopel39 Sorry, I haven't read this issue for a long time. I would think it is an iceberg bug https://github.com/apache/iceberg/issues/8864, Hi, @nastra , Could you please help to check this ?
@findepi @sopel39 Just a friendly ping, the test failed was caused by iceberg bug and I fixed it
(https://github.com/apache/iceberg/pull/8873) . But now, I can only remove the end-to-end test(TestPushPartialTopNToIceberg.java
in my PR) first. After the latest version of iceberg is released, I think this test can be added back. I'd like to know how you two think about this.
If Trino knows that each file has been sorted, it can remove the Partial Sort and Partial TopN from the execution plan, which will significantly improve query performance. .... Performance gains are significant with ASC and DESC ordering. Specifically, we see a more than 20x increase in performance with ASC and a 2x increase in performance with DESC when the limit is relatively large (e.g., ORDER BY x DESC LIMIT 10000).
cc @sopel39 @lukasz-stec
If Trino knows that each file has been sorted, it can remove the Partial Sort and Partial TopN from the execution plan, which will significantly improve query performance.
I think it's a good idea. @findepi will you continue with review?
This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua
@zhangminglei please submit the new version as you suggested and also adapt by rebasing. Let us know how we can help after that.
Thanks @mosabua ! The latest code has been merged to production within our company. The code ensures that data written in one order (e.g. DESC) can be query in two different orders (DESC or ASC) with almost consistent query performance.
I will submit the latest code as soon as possible
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: zhangminglei. This is most likely caused by a git client misconfiguration; please make sure to:
- check if your git client is configured with an email to sign commits
git config --list | grep email
- If not, set it up using
git config --global user.email [email protected]
- Make sure that the git commit email is configured in your GitHub account settings, see https://github.com/settings/emails
I have submitted the MR, but I need to make some changes (including code, description, and ideas, etc.). Once the changes are made, I will ping the relevant people for review. Thank you.
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: zhangminglei. This is most likely caused by a git client misconfiguration; please make sure to:
- check if your git client is configured with an email to sign commits
git config --list | grep email
- If not, set it up using
git config --global user.email [email protected]
- Make sure that the git commit email is configured in your GitHub account settings, see https://github.com/settings/emails
Hello, @mosabua @findepi @sopel39 I think the PR is ready for review, but I still need to add some implementation ideas to speed up the process, but this does not prevent the review from starting now. thank you.
I have added the Backgrounds and Core Idea in the PR description.
Thanks @findepi for review, I have modified the PR in 1st round commit
based on your comments . Please continue to check it out when you have time, Thank you.
Fyi @martint @findepi @osscm
This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua
Closing this pull request, as it has been stale for six weeks. Feel free to re-open at any time.
I think we should still pursue this so I am reopening
This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua