Reuse table scan results when the same table is used in different parts of query
In tpcds/q95 web_sales table is scanned multiple times. Additionally, that table is then distributed across nodes using same hash column:
Fragment 12 [SOURCE]
CPU: 2.63m, Scheduled: 4.87m, Input: 720000376 rows (12.07GB); per task: avg.: 144000075.20 std.dev.: 8069490.17, Output: 720000376 rows (18.10GB)
Output layout: [ws_warehouse_sk_142, ws_order_number_144, $hashvalue_202]
Output partitioning: HASH [ws_order_number_144][$hashvalue_202]
Stage Execution Strategy: UNGROUPED_EXECUTION
ScanProject[table = hive:tpcds_sf1000_orc_part:web_sales, grouped = false]
Layout: [ws_warehouse_sk_142:bigint, ws_order_number_144:bigint, $hashvalue_202:bigint]
Estimates: {rows: 709935839 (17.84GB), cpu: 11.89G, memory: 0B, network: 0B}/{rows: 709935839 (17.84GB), cpu: 29.73G, memory: 0B, network: 0B}
CPU: 2.63m (3.26%), Scheduled: 6.54m (5.77%), Output: 720000376 rows (18.10GB)
Input avg.: 381963.06 rows, Input std.dev.: 58.18%
$hashvalue_202 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("ws_order_number_144"), 0))
ws_order_number_144 := ws_order_number:bigint:REGULAR
ws_warehouse_sk_142 := ws_warehouse_sk:bigint:REGULAR
ws_sold_date_sk:bigint:PARTITION_KEY
:: [NULL, [2450816, 2452642]]
Input: 720000376 rows (12.07GB), Filtered: 0.00%
or
Fragment 10 [SOURCE]
CPU: 3.50m, Scheduled: 6.87m, Input: 720000376 rows (12.07GB); per task: avg.: 144000075.20 std.dev.: 10764103.21, Output: 720000376 rows (18.10GB)
Output layout: [ws_warehouse_sk_104, ws_order_number_106, $hashvalue_196]
Output partitioning: HASH [ws_order_number_106][$hashvalue_196]
Stage Execution Strategy: UNGROUPED_EXECUTION
ScanFilterProject[table = hive:tpcds_sf1000_orc_part:web_sales, grouped = false, filterPredicate = true, dynamicFilter = {"ws_order_number_106" = #df_2065, "ws_order_n
Layout: [ws_warehouse_sk_104:bigint, ws_order_number_106:bigint, $hashvalue_196:bigint]
Estimates: {rows: 709935839 (17.84GB), cpu: 11.89G, memory: 0B, network: 0B}/{rows: 709935839 (17.84GB), cpu: 23.78G, memory: 0B, network: 0B}/{rows: 709935839 (17
CPU: 3.50m (4.34%), Scheduled: 8.45m (7.46%), Output: 720000376 rows (18.10GB)
Input avg.: 381963.06 rows, Input std.dev.: 58.18%
$hashvalue_196 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("ws_order_number_106"), 0))
ws_order_number_106 := ws_order_number:bigint:REGULAR
ws_warehouse_sk_104 := ws_warehouse_sk:bigint:REGULAR
ws_sold_date_sk:bigint:PARTITION_KEY
:: [NULL, [2450816, 2452642]]
Input: 720000376 rows (12.07GB), Filtered: 0.00%
web_sales table is a large table. Instead of reading it multiple times, it should be possible to cache TableScan results in output buffers and read it by multiple downstream stages. Note that some web_sales scans have DF applied, so such optimization should not increase query wall time.
Similar approach could be used to cache CTE (https://github.com/prestosql/presto/issues/5878) in the future.
maybe https://github.com/prestodb/presto/pull/15155 helps
Might be addressed by https://github.com/trinodb/trino/pull/14271, cc @lukasz-stec
Hi @sopel39 Looks like #14271 is not being added, do you know is there a plan to do that?
otherwise scan/fragment caching would be a good feature, WDYT?, we can also refer [prestodb/presto#15155] (https://github.com/prestodb/presto/pull/15155) what @tooptoop4 has also shared.
This seems to be not caching the whole TableScan but caching the fragments/pages.
@osscm
Looks like https://github.com/trinodb/trino/pull/14271 is not being added, do you know is there a plan to do that?
https://github.com/trinodb/trino/pull/14271 was simplified version of original Fusing rules PR. IIRC original fusing rules had good gains but also some non-trivial regressions. More generally, we lean towards implementing something like fusing rules in a more rule-based, iterative approach rather than plan rewriters.
otherwise scan/fragment caching would be a good feature, WDYT?, we can also refer [https://github.com/prestodb/presto/pull/15155] (https://github.com/prestodb/presto/pull/15155) what @tooptoop4 has also shared.
Definitely. In fact, such technology powers Starburst multi-level cache (https://www.starburst.io/blog/introducing-multilayer-caching/). However, there are many ways this can be achieved.
@sopel39 thanks!
we are also very much interested in ways to improve the query performance, to make it more attractable for wider use-cases. Like tardigrade is for ETL.
The presto implementation also looks descent. But yes, can you please share other any other ideas as well, we are more than happy to collaborate.
HUAWEI OpenLookeng (originated from Trino 350) also has a table scan reuse feature, maybe we can refer to the design in the future:
https://gitee.com/openlookeng/hetu-core/pulls/443
I'm in the process of open sourcing subquery cache feature from Starburst. Please be patient as it's pretty large feature and consist of multiple parts. Once there is a PR, I will close this issue and https://github.com/trinodb/trino/issues/5878 and create a new epic issue with future improvements.
Here is the PR for subquery cache: https://github.com/trinodb/trino/pull/21888
Superseded by https://github.com/trinodb/trino/issues/22114