trino icon indicating copy to clipboard operation
trino copied to clipboard

Reuse table scan results when the same table is used in different parts of query

Open sopel39 opened this issue 5 years ago • 9 comments

In tpcds/q95 web_sales table is scanned multiple times. Additionally, that table is then distributed across nodes using same hash column:

 Fragment 12 [SOURCE]
     CPU: 2.63m, Scheduled: 4.87m, Input: 720000376 rows (12.07GB); per task: avg.: 144000075.20 std.dev.: 8069490.17, Output: 720000376 rows (18.10GB)
     Output layout: [ws_warehouse_sk_142, ws_order_number_144, $hashvalue_202]
     Output partitioning: HASH [ws_order_number_144][$hashvalue_202]
     Stage Execution Strategy: UNGROUPED_EXECUTION
     ScanProject[table = hive:tpcds_sf1000_orc_part:web_sales, grouped = false]
         Layout: [ws_warehouse_sk_142:bigint, ws_order_number_144:bigint, $hashvalue_202:bigint]
         Estimates: {rows: 709935839 (17.84GB), cpu: 11.89G, memory: 0B, network: 0B}/{rows: 709935839 (17.84GB), cpu: 29.73G, memory: 0B, network: 0B}
         CPU: 2.63m (3.26%), Scheduled: 6.54m (5.77%), Output: 720000376 rows (18.10GB)
         Input avg.: 381963.06 rows, Input std.dev.: 58.18%
         $hashvalue_202 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("ws_order_number_144"), 0))
         ws_order_number_144 := ws_order_number:bigint:REGULAR
         ws_warehouse_sk_142 := ws_warehouse_sk:bigint:REGULAR
         ws_sold_date_sk:bigint:PARTITION_KEY
             :: [NULL, [2450816, 2452642]]
         Input: 720000376 rows (12.07GB), Filtered: 0.00%

or

 Fragment 10 [SOURCE]
     CPU: 3.50m, Scheduled: 6.87m, Input: 720000376 rows (12.07GB); per task: avg.: 144000075.20 std.dev.: 10764103.21, Output: 720000376 rows (18.10GB)
     Output layout: [ws_warehouse_sk_104, ws_order_number_106, $hashvalue_196]
     Output partitioning: HASH [ws_order_number_106][$hashvalue_196]
     Stage Execution Strategy: UNGROUPED_EXECUTION
     ScanFilterProject[table = hive:tpcds_sf1000_orc_part:web_sales, grouped = false, filterPredicate = true, dynamicFilter = {"ws_order_number_106" = #df_2065, "ws_order_n
         Layout: [ws_warehouse_sk_104:bigint, ws_order_number_106:bigint, $hashvalue_196:bigint]
         Estimates: {rows: 709935839 (17.84GB), cpu: 11.89G, memory: 0B, network: 0B}/{rows: 709935839 (17.84GB), cpu: 23.78G, memory: 0B, network: 0B}/{rows: 709935839 (17
         CPU: 3.50m (4.34%), Scheduled: 8.45m (7.46%), Output: 720000376 rows (18.10GB)
         Input avg.: 381963.06 rows, Input std.dev.: 58.18%
         $hashvalue_196 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("ws_order_number_106"), 0))
         ws_order_number_106 := ws_order_number:bigint:REGULAR
         ws_warehouse_sk_104 := ws_warehouse_sk:bigint:REGULAR
         ws_sold_date_sk:bigint:PARTITION_KEY
             :: [NULL, [2450816, 2452642]]
         Input: 720000376 rows (12.07GB), Filtered: 0.00%

web_sales table is a large table. Instead of reading it multiple times, it should be possible to cache TableScan results in output buffers and read it by multiple downstream stages. Note that some web_sales scans have DF applied, so such optimization should not increase query wall time.

sopel39 avatar Nov 09 '20 12:11 sopel39

Similar approach could be used to cache CTE (https://github.com/prestosql/presto/issues/5878) in the future.

sopel39 avatar Nov 09 '20 12:11 sopel39

maybe https://github.com/prestodb/presto/pull/15155 helps

tooptoop4 avatar Jun 20 '21 09:06 tooptoop4

Might be addressed by https://github.com/trinodb/trino/pull/14271, cc @lukasz-stec

sopel39 avatar Sep 28 '22 10:09 sopel39

Hi @sopel39 Looks like #14271 is not being added, do you know is there a plan to do that?

otherwise scan/fragment caching would be a good feature, WDYT?, we can also refer [prestodb/presto#15155] (https://github.com/prestodb/presto/pull/15155) what @tooptoop4 has also shared.

This seems to be not caching the whole TableScan but caching the fragments/pages.

osscm avatar Feb 15 '24 04:02 osscm

@osscm

Looks like https://github.com/trinodb/trino/pull/14271 is not being added, do you know is there a plan to do that?

https://github.com/trinodb/trino/pull/14271 was simplified version of original Fusing rules PR. IIRC original fusing rules had good gains but also some non-trivial regressions. More generally, we lean towards implementing something like fusing rules in a more rule-based, iterative approach rather than plan rewriters.

otherwise scan/fragment caching would be a good feature, WDYT?, we can also refer [https://github.com/prestodb/presto/pull/15155] (https://github.com/prestodb/presto/pull/15155) what @tooptoop4 has also shared.

Definitely. In fact, such technology powers Starburst multi-level cache (https://www.starburst.io/blog/introducing-multilayer-caching/). However, there are many ways this can be achieved.

sopel39 avatar Feb 15 '24 15:02 sopel39

@sopel39 thanks!

we are also very much interested in ways to improve the query performance, to make it more attractable for wider use-cases. Like tardigrade is for ETL.

The presto implementation also looks descent. But yes, can you please share other any other ideas as well, we are more than happy to collaborate.

osscm avatar Mar 03 '24 23:03 osscm

HUAWEI OpenLookeng (originated from Trino 350) also has a table scan reuse feature, maybe we can refer to the design in the future:
https://gitee.com/openlookeng/hetu-core/pulls/443

hackeryang avatar May 06 '24 09:05 hackeryang

I'm in the process of open sourcing subquery cache feature from Starburst. Please be patient as it's pretty large feature and consist of multiple parts. Once there is a PR, I will close this issue and https://github.com/trinodb/trino/issues/5878 and create a new epic issue with future improvements.

sopel39 avatar May 07 '24 08:05 sopel39

Here is the PR for subquery cache: https://github.com/trinodb/trino/pull/21888

sopel39 avatar May 24 '24 13:05 sopel39

Superseded by https://github.com/trinodb/trino/issues/22114

sopel39 avatar May 27 '24 14:05 sopel39