[Enhancement] support global late materialization on iceberg v3
Why I'm doing:
What I'm doing:
Fixes #60015
In this PR, I implemented global late materialization based on iceberg v3. The basic idea can be referred to the description in the above issue.
Next, I'll explain the basic design and implementation, focusing on several key issues.
Key Points of the Global Late Materialization Framework
Given that StarRocks supports querying data from various data sources, global late materialization theoretically needs to work on any data source, so the design needs to be easily extensible.
How to Represent RowPosition?
RowPosition describes how to find a row. Although different data sources may represent RowPosition differently, they all represent a combination of one or more columns. We introduce the RowPositionDescriptor to describe this relationship.
RowPositionDescriptor consists of two parts: one row source slot and some ref slots.
If a table has columns that require lazy reads, the table generates several columns to describe the position during the early read phase. One column describes the compute node where the data originated (the column pointed to by the row source slot). During a lazy read, the request is sent to the corresponding compute node for processing. The columns pointed to by the ref slots are combined to logically uniquely identify a row. For example, suppose we want to design a RowPositionDescriptor for a Primary Key Table with primary keys a and b. The ref slots should include slots corresponding to columns a and b.
This information allows us to determine how to find a particular row of data.
How to generate a global late materialization execution plan?
During the final stage of query optimization, the LateMaterializationRewriter is responsible for this task.
The rewriting process is roughly divided into two steps:
-
The
ColumnCollectorcollects all columns that require lazy reads, along with their first usage locations, and determines the timing of lazy reads. This information is recorded in theCollectorContext. -
The
PlanRewriter, based on theCollectorContextinformation, performs the actual physical plan rewriting, inserting Fetch and LookUp operators at specific locations to perform lazy reads.
How are Fetch and LookUp operators implemented?
During the execution phase, the Fetch operator initiates LookUpRequests to the LookUp operator, retrieving specific data based on the rowPosition. The LookUp operator processes the LookUpRequest. To ensure scalability, we introduced several new abstractions. Each data source needs to implement its own implementation for each of these:
-
FetchTask: Responsible for executing fetch operations and sending LookupRequests to specific lookup operators.
-
FetchTaskContext: Used to record the context required by the FetchTask.
-
LookUpTask: Responsible for executing lookup operations and reading data from a specific data source.
-
LookUpTaskContext: Used to record the context required by the LookUpTask.
The core logic of the Fetch and LookUp operators is implemented by FetchProcessor and LookupProcessor. To minimize IO costs, they each have specific batching process strategies.
how to implement the global lazy materialization on Iceberg?
Based on the above design, the key issues are how to represent row position and how to perform lazy reads.
In Iceberg v3, row position consists of three columns:
-
row_source_id: The compute node that generated the row data.
-
scan_range_id: The identifier of the ScanRange that generated the row data. During the early read phase, the
HiveConnectorgenerates a unique ID for each scan range. -
Row id: A reserved field introduced in Iceberg v3 that uniquely identifies a row.
with the row_source_id and scan_range_id, we know how to create a HiveDataSource for lazy reads. We also create predicates for the _row_id columns we want to read and pass them into the HiveDataSource.
Predicates for multiple - _row_id columns are concatenated using OR. For example, if we need to read _row_ids in [1, 10) and [20, 30), the generated predicate would be (_row_id >= 1 and _row_id < 10) or (_row_id >= 20 and _row_id < 30).
Performance Improvements
I constructed three typical queries based on a SSB 100g dataset for testing. They all have a common feature: they return a large number of columns, but only a few columns are involved in joins/sorts, resulting in a small number of rows returned.
The test environment included one 8-core 16GB FE node and three 16-core 64GB BE nodes. baseline is the result of executing with default parameters. opt is the result of enabling global late materialization (set enable_global_late_materialization=true).
--baseline: 12.74s
--opt 3.49s
select * from lineorder join part on lo_partkey = p_partkey where (p_size + lo_linenumber) > 60;
--baseline: 9.02s
--opt: 1.71s
select * from lineorder order by lo_partkey limit 100000, 5;
--baseline: 9.88s
--opt: 2.85s
select * from lineorder, customer where lineorder.lo_custkey = customer.c_custkey order by lineorder.lo_custkey limit 10;
What type of PR is this:
- [ ] BugFix
- [ ] Feature
- [x] Enhancement
- [ ] Refactor
- [ ] UT
- [ ] Doc
- [ ] Tool
Does this PR entail a change in behavior?
- [ ] Yes, this PR will result in a change in behavior.
- [x] No, this PR will not result in a change in behavior.
If yes, please specify the type of change:
- [ ] Interface/UI changes: syntax, type conversion, expression evaluation, display information
- [ ] Parameter changes: default values, similar parameters but with different default values
- [ ] Policy changes: use new policy to replace old one, functionality automatically enabled
- [ ] Feature removed
- [ ] Miscellaneous: upgrade & downgrade compatibility, etc.
Checklist:
- [x] I have added test cases for my bug fix or my new feature
- [ ] This pr needs user documentation (for new or modified features or behaviors)
- [ ] I have added documentation for my new feature or new function
- [ ] This is a backport pr
Bugfix cherry-pick branch check:
- [x] I have checked the version labels which the pr will be auto-backported to the target branch
- [ ] 4.0
- [ ] 3.5
- [ ] 3.4
- [ ] 3.3
[!NOTE] Implements global late materialization for Iceberg v3 using new Fetch/LookUp nodes/operators, a runtime lookup framework, and planner rewrite, plus Parquet/Hive integration, RPC, and configs.
- Planner/Frontend:
- Add
LateMaterializationRewriterto insertFetchandLookUpinto plans; introduceRowPositionDescriptor.- New physical operators
PhysicalFetchOperatorandPhysicalLookUpOperator; explain/debug support.- New plan nodes
FetchNodeandLookUpNode; fragment builder integration and local worker assignment.- Enable via session var
enable_global_late_materialization; FE tests added.- Runtime/Backend:
- New pipeline components:
fetch_sink/sourceoperators,FetchProcessor/Task,LookUpOperator/Request.- Add
LookUpDispatcherMgrfor request routing; BE query context stores global LM contexts (glm_manager).- Support
NOOP_SINKand plumbing inDataSink.- Connectors/Formats:
- Hive connector: propagate scan range IDs and initialize global LM context.
- Parquet: add
RowSourceReader, passscan_range_id; adjust Iceberg row-id reader; handle reserved fields (_row_source_id,_scan_range_id).- RPC/Service:
- New
lookupRPC (proto/thrift), request/response column I/O; server/client wiring.- Config:
- Add LM-related BE configs (e.g., fetch buffers, batch sizes) and session option in thrift.
- Misc:
- Enhance
Chunk::debug_columns()for empty columns; misc build files and tests for Iceberg v3 TopN/Join.Written by Cursor Bugbot for commit 36a2f1a02db79ff85755189f058d687410848ab5. This will update automatically on new commits. Configure here.
@cursor review
๐งช CI Insights
Here's what we observed from your CI run for b343b948.
๐ข All jobs passed!
But CI Insights is watching ๐
@cursor review
@cursor review
@mergify rebase
rebase
โ Branch has been successfully rebased
@cursor review
@cursor review
@cursor review
@mergify rebase
rebase
โ Branch has been successfully rebased
@cursor review
@cursor review
@cursor review
@cursor review
@cursor review
@cursor review
@cursor review
Quality Gate failed
Failed conditions
C Reliability Rating on New Code (required โฅ A)
See analysis details on SonarQube Cloud
Catch issues before they fail your Quality Gate with our IDE extension
SonarQube for IDE
@cursor review
[Java-Extensions Incremental Coverage Report]
:white_check_mark: pass : 0 / 0 (0%)
[FE Incremental Coverage Report]
:x: fail : 609 / 795 (76.60%)
file detail
| path | covered_line | new_line | coverage | not_covered_line_detail | |
|---|---|---|---|---|---|
| :large_blue_circle: | com/starrocks/sql/common/DebugOperatorTracer.java | 0 | 24 | 00.00% | [575, 576, 578, 579, 580, 581, 582, 583, 584, 585, 586, 587, 592, 593, 594, 595, 596, 597, 598, 599, 600, 601, 602, 603] |
| :large_blue_circle: | com/starrocks/sql/optimizer/operator/physical/PhysicalFetchOperator.java | 16 | 27 | 59.26% | [54, 58, 88, 89, 91, 92, 94, 95, 96, 97, 98] |
| :large_blue_circle: | com/starrocks/sql/optimizer/operator/physical/PhysicalLookUpOperator.java | 19 | 29 | 65.52% | [101, 102, 104, 105, 107, 108, 109, 110, 111, 112] |
| :large_blue_circle: | com/starrocks/sql/optimizer/LateMaterializationRewriter.java | 376 | 503 | 74.75% | [162, 241, 244, 247, 281, 282, 283, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 371, 390, 391, 401, 424, 425, 426, 427, 428, 430, 440, 441, 544, 545, 564, 565, 566, 568, 570, 571, 572, 573, 574, 576, 578, 583, 584, 585, 586, 591, 592, 593, 594, 596, 597, 598, 600, 601, 606, 607, 608, 610, 612, 614, 615, 616, 617, 618, 620, 621, 622, 627, 628, 629, 630, 631, 632, 633, 635, 636, 687, 688, 689, 690, 691, 692, 694, 696, 699, 700, 702, 704, 706, 707, 708, 709, 711, 713, 715, 716, 717, 718, 719, 720, 721, 723, 814, 815, 816, 817, 818, 819, 821, 822, 867, 868, 882] |
| :large_blue_circle: | com/starrocks/planner/FetchNode.java | 40 | 51 | 78.43% | [72, 88, 89, 90, 91, 92, 93, 106, 107, 116, 121] |
| :large_blue_circle: | com/starrocks/sql/optimizer/operator/physical/PhysicalJoinOperator.java | 5 | 6 | 83.33% | [81] |
| :large_blue_circle: | com/starrocks/planner/NoopSink.java | 5 | 6 | 83.33% | [45] |
| :large_blue_circle: | com/starrocks/sql/plan/PlanFragmentBuilder.java | 79 | 80 | 98.75% | [4444] |
| :large_blue_circle: | com/starrocks/qe/SessionVariable.java | 5 | 5 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/sql/optimizer/QueryOptimizer.java | 3 | 3 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/catalog/IcebergTable.java | 2 | 2 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/sql/optimizer/OptExpression.java | 6 | 6 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/sql/optimizer/operator/OperatorVisitor.java | 2 | 2 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/sql/optimizer/LogicalPlanPrinter.java | 15 | 15 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/sql/optimizer/operator/OperatorType.java | 2 | 2 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/planner/LookUpNode.java | 16 | 16 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/sql/optimizer/operator/ScanOperatorPredicates.java | 7 | 7 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/sql/optimizer/operator/physical/PhysicalTopNOperator.java | 1 | 1 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/qe/scheduler/assignment/FragmentAssignmentStrategyFactory.java | 1 | 1 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/sql/optimizer/OptExpressionVisitor.java | 2 | 2 | 100.00% | [] |
| :large_blue_circle: | com/starrocks/qe/scheduler/assignment/LocalFragmentAssignmentStrategy.java | 7 | 7 | 100.00% | [] |
[BE Incremental Coverage Report]
:white_check_mark: pass : 1386 / 1584 (87.50%)
file detail
| path | covered_line | new_line | coverage | not_covered_line_detail | |
|---|---|---|---|---|---|
| :large_blue_circle: | be/src/exec/lookup_node.h | 0 | 3 | 00.00% | [33, 35, 37] |
| :large_blue_circle: | be/src/serde/protobuf_serde.cpp | 0 | 1 | 00.00% | [308] |
| :large_blue_circle: | be/src/runtime/noop_sink.h | 2 | 11 | 18.18% | [26, 27, 28, 29, 30, 31, 32, 34, 36] |
| :large_blue_circle: | be/src/formats/parquet/row_source_reader.h | 3 | 12 | 25.00% | [28, 29, 31, 32, 34, 36, 39, 42, 45] |
| :large_blue_circle: | be/src/service/internal_service.cpp | 16 | 31 | 51.61% | [1504, 1505, 1515, 1516, 1517, 1518, 1519, 1520, 1523, 1524, 1525, 1526, 1527, 1530, 1531] |
| :large_blue_circle: | be/src/runtime/descriptors.cpp | 15 | 27 | 55.56% | [888, 889, 890, 891, 892, 893, 895, 896, 897, 899, 900, 901] |
| :large_blue_circle: | be/src/exec/pipeline/fetch_sink_operator.h | 4 | 7 | 57.14% | [36, 43, 44] |
| :large_blue_circle: | be/src/exec/pipeline/fetch_task.cpp | 80 | 125 | 64.00% | [33, 34, 37, 38, 43, 49, 50, 51, 52, 53, 54, 55, 57, 58, 59, 60, 82, 83, 84, 85, 86, 96, 97, 98, 99, 100, 101, 105, 106, 107, 108, 109, 116, 117, 118, 119, 120, 129, 130, 131, 133, 134, 135, 136, 137] |
| :large_blue_circle: | be/src/exec/pipeline/lookup_operator.h | 13 | 20 | 65.00% | [54, 55, 67, 68, 69, 70, 72] |
| :large_blue_circle: | be/src/exec/pipeline/fetch_task.h | 7 | 10 | 70.00% | [56, 73, 79] |
| :large_blue_circle: | be/src/exec/pipeline/fetch_source_operator.h | 7 | 10 | 70.00% | [42, 43, 63] |
| :large_blue_circle: | be/src/exec/pipeline/fetch_processor.h | 15 | 20 | 75.00% | [111, 112, 113, 114, 116] |
| :large_blue_circle: | be/src/exec/pipeline/scan/glm_manager.cpp | 9 | 12 | 75.00% | [21, 22, 23] |
| :large_blue_circle: | be/src/exec/pipeline/lookup_request.h | 11 | 14 | 78.57% | [46, 47, 49] |
| :large_blue_circle: | be/src/formats/parquet/group_reader.cpp | 5 | 6 | 83.33% | [383] |
| :large_blue_circle: | be/src/exec/pipeline/lookup_operator.cpp | 147 | 163 | 90.18% | [91, 121, 122, 137, 138, 182, 218, 261, 263, 264, 270, 271, 277, 294, 295, 297] |
| :large_blue_circle: | be/src/exec/pipeline/lookup_request.cpp | 405 | 450 | 90.00% | [37, 39, 40, 41, 42, 43, 44, 45, 46, 48, 51, 52, 53, 54, 55, 56, 58, 59, 60, 63, 64, 65, 81, 82, 83, 84, 364, 365, 366, 367, 369, 370, 371, 372, 373, 374, 375, 377, 378, 379, 380, 382, 383, 384, 594] |
| :large_blue_circle: | be/src/exec/lookup_node.cpp | 31 | 34 | 91.18% | [56, 57, 58] |
| :large_blue_circle: | be/src/column/chunk.cpp | 28 | 30 | 93.33% | [422, 455] |
| :large_blue_circle: | be/src/runtime/lookup_stream_mgr.cpp | 69 | 73 | 94.52% | [108, 109, 110, 111] |
| :large_blue_circle: | be/src/connector/hive_connector.cpp | 25 | 26 | 96.15% | [59] |
| :large_blue_circle: | be/src/exec/pipeline/fetch_source_operator.cpp | 26 | 27 | 96.30% | [51] |
| :large_blue_circle: | be/src/exec/pipeline/fetch_processor.cpp | 320 | 324 | 98.77% | [50, 213, 214, 392] |
| :large_blue_circle: | be/src/runtime/lookup_stream_mgr.h | 12 | 12 | 100.00% | [] |
| :large_blue_circle: | be/src/exec/fetch_node.cpp | 39 | 39 | 100.00% | [] |
| :large_blue_circle: | be/src/runtime/exec_env.h | 1 | 1 | 100.00% | [] |
| :large_blue_circle: | be/src/runtime/exec_env.cpp | 4 | 4 | 100.00% | [] |
| :large_blue_circle: | be/src/formats/parquet/row_source_reader.cpp | 6 | 6 | 100.00% | [] |
| :large_blue_circle: | be/src/formats/parquet/file_reader.cpp | 1 | 1 | 100.00% | [] |
| :large_blue_circle: | be/src/exec/pipeline/query_context.h | 2 | 2 | 100.00% | [] |
| :large_blue_circle: | be/src/storage/range.h | 1 | 1 | 100.00% | [] |
| :large_blue_circle: | be/src/exec/exec_node.cpp | 6 | 6 | 100.00% | [] |
| :large_blue_circle: | be/src/util/internal_service_recoverable_stub.cpp | 4 | 4 | 100.00% | [] |
| :large_blue_circle: | be/src/exec/pipeline/scan/glm_manager.h | 11 | 11 | 100.00% | [] |
| :large_blue_circle: | be/src/exec/hdfs_scanner/hdfs_scanner.cpp | 3 | 3 | 100.00% | [] |
| :large_blue_circle: | be/src/runtime/runtime_state.h | 3 | 3 | 100.00% | [] |
| :large_blue_circle: | be/src/formats/parquet/scalar_column_reader.cpp | 5 | 5 | 100.00% | [] |
| :large_blue_circle: | be/src/exec/data_sink.cpp | 8 | 8 | 100.00% | [] |
| :large_blue_circle: | be/src/runtime/descriptors.h | 13 | 13 | 100.00% | [] |
| :large_blue_circle: | be/src/exec/pipeline/fetch_sink_operator.cpp | 27 | 27 | 100.00% | [] |
| :large_blue_circle: | be/src/exec/pipeline/query_context.cpp | 2 | 2 | 100.00% | [] |