starrocks icon indicating copy to clipboard operation
starrocks copied to clipboard

[Enhancement] support global late materialization on iceberg v3

Open silverbullet233 opened this issue 2 months ago โ€ข 21 comments

Why I'm doing:

What I'm doing:

Fixes #60015

In this PR, I implemented global late materialization based on iceberg v3. The basic idea can be referred to the description in the above issue.

Next, I'll explain the basic design and implementation, focusing on several key issues.

Key Points of the Global Late Materialization Framework

Given that StarRocks supports querying data from various data sources, global late materialization theoretically needs to work on any data source, so the design needs to be easily extensible.

How to Represent RowPosition?

RowPosition describes how to find a row. Although different data sources may represent RowPosition differently, they all represent a combination of one or more columns. We introduce the RowPositionDescriptor to describe this relationship.

RowPositionDescriptor consists of two parts: one row source slot and some ref slots.

If a table has columns that require lazy reads, the table generates several columns to describe the position during the early read phase. One column describes the compute node where the data originated (the column pointed to by the row source slot). During a lazy read, the request is sent to the corresponding compute node for processing. The columns pointed to by the ref slots are combined to logically uniquely identify a row. For example, suppose we want to design a RowPositionDescriptor for a Primary Key Table with primary keys a and b. The ref slots should include slots corresponding to columns a and b.

This information allows us to determine how to find a particular row of data.

How to generate a global late materialization execution plan?

During the final stage of query optimization, the LateMaterializationRewriter is responsible for this task.

The rewriting process is roughly divided into two steps:

  1. The ColumnCollector collects all columns that require lazy reads, along with their first usage locations, and determines the timing of lazy reads. This information is recorded in the CollectorContext.

  2. The PlanRewriter, based on the CollectorContext information, performs the actual physical plan rewriting, inserting Fetch and LookUp operators at specific locations to perform lazy reads.

How are Fetch and LookUp operators implemented?

During the execution phase, the Fetch operator initiates LookUpRequests to the LookUp operator, retrieving specific data based on the rowPosition. The LookUp operator processes the LookUpRequest. To ensure scalability, we introduced several new abstractions. Each data source needs to implement its own implementation for each of these:

  • FetchTask: Responsible for executing fetch operations and sending LookupRequests to specific lookup operators.

  • FetchTaskContext: Used to record the context required by the FetchTask.

  • LookUpTask: Responsible for executing lookup operations and reading data from a specific data source.

  • LookUpTaskContext: Used to record the context required by the LookUpTask.

The core logic of the Fetch and LookUp operators is implemented by FetchProcessor and LookupProcessor. To minimize IO costs, they each have specific batching process strategies.

how to implement the global lazy materialization on Iceberg?

Based on the above design, the key issues are how to represent row position and how to perform lazy reads.

In Iceberg v3, row position consists of three columns:

  • row_source_id: The compute node that generated the row data.

  • scan_range_id: The identifier of the ScanRange that generated the row data. During the early read phase, the HiveConnector generates a unique ID for each scan range.

  • Row id: A reserved field introduced in Iceberg v3 that uniquely identifies a row.

with the row_source_id and scan_range_id, we know how to create a HiveDataSource for lazy reads. We also create predicates for the _row_id columns we want to read and pass them into the HiveDataSource. Predicates for multiple - _row_id columns are concatenated using OR. For example, if we need to read _row_ids in [1, 10) and [20, 30), the generated predicate would be (_row_id >= 1 and _row_id < 10) or (_row_id >= 20 and _row_id < 30).

Performance Improvements

I constructed three typical queries based on a SSB 100g dataset for testing. They all have a common feature: they return a large number of columns, but only a few columns are involved in joins/sorts, resulting in a small number of rows returned.

The test environment included one 8-core 16GB FE node and three 16-core 64GB BE nodes. baseline is the result of executing with default parameters. opt is the result of enabling global late materialization (set enable_global_late_materialization=true).

--baseline: 12.74s
--opt 3.49s
select * from lineorder join part on lo_partkey = p_partkey where (p_size + lo_linenumber) > 60;

--baseline: 9.02s
--opt: 1.71s
select * from lineorder order by lo_partkey limit 100000, 5;

--baseline: 9.88s
--opt: 2.85s
select * from lineorder, customer where lineorder.lo_custkey = customer.c_custkey order by lineorder.lo_custkey limit 10;

What type of PR is this:

  • [ ] BugFix
  • [ ] Feature
  • [x] Enhancement
  • [ ] Refactor
  • [ ] UT
  • [ ] Doc
  • [ ] Tool

Does this PR entail a change in behavior?

  • [ ] Yes, this PR will result in a change in behavior.
  • [x] No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • [ ] Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • [ ] Parameter changes: default values, similar parameters but with different default values
  • [ ] Policy changes: use new policy to replace old one, functionality automatically enabled
  • [ ] Feature removed
  • [ ] Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • [x] I have added test cases for my bug fix or my new feature
  • [ ] This pr needs user documentation (for new or modified features or behaviors)
    • [ ] I have added documentation for my new feature or new function
  • [ ] This is a backport pr

Bugfix cherry-pick branch check:

  • [x] I have checked the version labels which the pr will be auto-backported to the target branch
    • [ ] 4.0
    • [ ] 3.5
    • [ ] 3.4
    • [ ] 3.3

[!NOTE] Implements global late materialization for Iceberg v3 using new Fetch/LookUp nodes/operators, a runtime lookup framework, and planner rewrite, plus Parquet/Hive integration, RPC, and configs.

  • Planner/Frontend:
    • Add LateMaterializationRewriter to insert Fetch and LookUp into plans; introduce RowPositionDescriptor.
    • New physical operators PhysicalFetchOperator and PhysicalLookUpOperator; explain/debug support.
    • New plan nodes FetchNode and LookUpNode; fragment builder integration and local worker assignment.
    • Enable via session var enable_global_late_materialization; FE tests added.
  • Runtime/Backend:
    • New pipeline components: fetch_sink/source operators, FetchProcessor/Task, LookUpOperator/Request.
    • Add LookUpDispatcherMgr for request routing; BE query context stores global LM contexts (glm_manager).
    • Support NOOP_SINK and plumbing in DataSink.
  • Connectors/Formats:
    • Hive connector: propagate scan range IDs and initialize global LM context.
    • Parquet: add RowSourceReader, pass scan_range_id; adjust Iceberg row-id reader; handle reserved fields (_row_source_id, _scan_range_id).
  • RPC/Service:
    • New lookup RPC (proto/thrift), request/response column I/O; server/client wiring.
  • Config:
    • Add LM-related BE configs (e.g., fetch buffers, batch sizes) and session option in thrift.
  • Misc:
    • Enhance Chunk::debug_columns() for empty columns; misc build files and tests for Iceberg v3 TopN/Join.

Written by Cursor Bugbot for commit 36a2f1a02db79ff85755189f058d687410848ab5. This will update automatically on new commits. Configure here.

silverbullet233 avatar Oct 16 '25 05:10 silverbullet233

@cursor review

alvin-celerdata avatar Oct 24 '25 18:10 alvin-celerdata

๐Ÿงช CI Insights

Here's what we observed from your CI run for b343b948.

๐ŸŸข All jobs passed!

But CI Insights is watching ๐Ÿ‘€

mergify[bot] avatar Oct 29 '25 08:10 mergify[bot]

@cursor review

alvin-celerdata avatar Nov 05 '25 17:11 alvin-celerdata

@cursor review

alvin-celerdata avatar Nov 06 '25 21:11 alvin-celerdata

@mergify rebase

silverbullet233 avatar Nov 10 '25 05:11 silverbullet233

rebase

โœ… Branch has been successfully rebased

mergify[bot] avatar Nov 10 '25 05:11 mergify[bot]

@cursor review

alvin-celerdata avatar Nov 10 '25 18:11 alvin-celerdata

@cursor review

alvin-celerdata avatar Nov 13 '25 15:11 alvin-celerdata

@cursor review

alvin-celerdata avatar Nov 14 '25 17:11 alvin-celerdata

@mergify rebase

silverbullet233 avatar Nov 17 '25 02:11 silverbullet233

rebase

โœ… Branch has been successfully rebased

mergify[bot] avatar Nov 17 '25 02:11 mergify[bot]

@cursor review

alvin-celerdata avatar Nov 17 '25 04:11 alvin-celerdata

@cursor review

alvin-celerdata avatar Nov 17 '25 17:11 alvin-celerdata

@cursor review

alvin-celerdata avatar Dec 08 '25 03:12 alvin-celerdata

@cursor review

alvin-celerdata avatar Dec 09 '25 03:12 alvin-celerdata

@cursor review

alvin-celerdata avatar Dec 11 '25 15:12 alvin-celerdata

@cursor review

alvin-celerdata avatar Dec 12 '25 04:12 alvin-celerdata

@cursor review

alvin-celerdata avatar Dec 12 '25 17:12 alvin-celerdata

Quality Gate Failed Quality Gate failed

Failed conditions
C Reliability Rating on New Code (required โ‰ฅ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

sonarqubecloud[bot] avatar Dec 13 '25 15:12 sonarqubecloud[bot]

@cursor review

alvin-celerdata avatar Dec 13 '25 19:12 alvin-celerdata

[Java-Extensions Incremental Coverage Report]

:white_check_mark: pass : 0 / 0 (0%)

github-actions[bot] avatar Dec 14 '25 14:12 github-actions[bot]

[FE Incremental Coverage Report]

:x: fail : 609 / 795 (76.60%)

file detail

path covered_line new_line coverage not_covered_line_detail
:large_blue_circle: com/starrocks/sql/common/DebugOperatorTracer.java 0 24 00.00% [575, 576, 578, 579, 580, 581, 582, 583, 584, 585, 586, 587, 592, 593, 594, 595, 596, 597, 598, 599, 600, 601, 602, 603]
:large_blue_circle: com/starrocks/sql/optimizer/operator/physical/PhysicalFetchOperator.java 16 27 59.26% [54, 58, 88, 89, 91, 92, 94, 95, 96, 97, 98]
:large_blue_circle: com/starrocks/sql/optimizer/operator/physical/PhysicalLookUpOperator.java 19 29 65.52% [101, 102, 104, 105, 107, 108, 109, 110, 111, 112]
:large_blue_circle: com/starrocks/sql/optimizer/LateMaterializationRewriter.java 376 503 74.75% [162, 241, 244, 247, 281, 282, 283, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 371, 390, 391, 401, 424, 425, 426, 427, 428, 430, 440, 441, 544, 545, 564, 565, 566, 568, 570, 571, 572, 573, 574, 576, 578, 583, 584, 585, 586, 591, 592, 593, 594, 596, 597, 598, 600, 601, 606, 607, 608, 610, 612, 614, 615, 616, 617, 618, 620, 621, 622, 627, 628, 629, 630, 631, 632, 633, 635, 636, 687, 688, 689, 690, 691, 692, 694, 696, 699, 700, 702, 704, 706, 707, 708, 709, 711, 713, 715, 716, 717, 718, 719, 720, 721, 723, 814, 815, 816, 817, 818, 819, 821, 822, 867, 868, 882]
:large_blue_circle: com/starrocks/planner/FetchNode.java 40 51 78.43% [72, 88, 89, 90, 91, 92, 93, 106, 107, 116, 121]
:large_blue_circle: com/starrocks/sql/optimizer/operator/physical/PhysicalJoinOperator.java 5 6 83.33% [81]
:large_blue_circle: com/starrocks/planner/NoopSink.java 5 6 83.33% [45]
:large_blue_circle: com/starrocks/sql/plan/PlanFragmentBuilder.java 79 80 98.75% [4444]
:large_blue_circle: com/starrocks/qe/SessionVariable.java 5 5 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/QueryOptimizer.java 3 3 100.00% []
:large_blue_circle: com/starrocks/catalog/IcebergTable.java 2 2 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/OptExpression.java 6 6 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/operator/OperatorVisitor.java 2 2 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/LogicalPlanPrinter.java 15 15 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/operator/OperatorType.java 2 2 100.00% []
:large_blue_circle: com/starrocks/planner/LookUpNode.java 16 16 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/operator/ScanOperatorPredicates.java 7 7 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/operator/physical/PhysicalTopNOperator.java 1 1 100.00% []
:large_blue_circle: com/starrocks/qe/scheduler/assignment/FragmentAssignmentStrategyFactory.java 1 1 100.00% []
:large_blue_circle: com/starrocks/sql/optimizer/OptExpressionVisitor.java 2 2 100.00% []
:large_blue_circle: com/starrocks/qe/scheduler/assignment/LocalFragmentAssignmentStrategy.java 7 7 100.00% []

github-actions[bot] avatar Dec 14 '25 14:12 github-actions[bot]

[BE Incremental Coverage Report]

:white_check_mark: pass : 1386 / 1584 (87.50%)

file detail

path covered_line new_line coverage not_covered_line_detail
:large_blue_circle: be/src/exec/lookup_node.h 0 3 00.00% [33, 35, 37]
:large_blue_circle: be/src/serde/protobuf_serde.cpp 0 1 00.00% [308]
:large_blue_circle: be/src/runtime/noop_sink.h 2 11 18.18% [26, 27, 28, 29, 30, 31, 32, 34, 36]
:large_blue_circle: be/src/formats/parquet/row_source_reader.h 3 12 25.00% [28, 29, 31, 32, 34, 36, 39, 42, 45]
:large_blue_circle: be/src/service/internal_service.cpp 16 31 51.61% [1504, 1505, 1515, 1516, 1517, 1518, 1519, 1520, 1523, 1524, 1525, 1526, 1527, 1530, 1531]
:large_blue_circle: be/src/runtime/descriptors.cpp 15 27 55.56% [888, 889, 890, 891, 892, 893, 895, 896, 897, 899, 900, 901]
:large_blue_circle: be/src/exec/pipeline/fetch_sink_operator.h 4 7 57.14% [36, 43, 44]
:large_blue_circle: be/src/exec/pipeline/fetch_task.cpp 80 125 64.00% [33, 34, 37, 38, 43, 49, 50, 51, 52, 53, 54, 55, 57, 58, 59, 60, 82, 83, 84, 85, 86, 96, 97, 98, 99, 100, 101, 105, 106, 107, 108, 109, 116, 117, 118, 119, 120, 129, 130, 131, 133, 134, 135, 136, 137]
:large_blue_circle: be/src/exec/pipeline/lookup_operator.h 13 20 65.00% [54, 55, 67, 68, 69, 70, 72]
:large_blue_circle: be/src/exec/pipeline/fetch_task.h 7 10 70.00% [56, 73, 79]
:large_blue_circle: be/src/exec/pipeline/fetch_source_operator.h 7 10 70.00% [42, 43, 63]
:large_blue_circle: be/src/exec/pipeline/fetch_processor.h 15 20 75.00% [111, 112, 113, 114, 116]
:large_blue_circle: be/src/exec/pipeline/scan/glm_manager.cpp 9 12 75.00% [21, 22, 23]
:large_blue_circle: be/src/exec/pipeline/lookup_request.h 11 14 78.57% [46, 47, 49]
:large_blue_circle: be/src/formats/parquet/group_reader.cpp 5 6 83.33% [383]
:large_blue_circle: be/src/exec/pipeline/lookup_operator.cpp 147 163 90.18% [91, 121, 122, 137, 138, 182, 218, 261, 263, 264, 270, 271, 277, 294, 295, 297]
:large_blue_circle: be/src/exec/pipeline/lookup_request.cpp 405 450 90.00% [37, 39, 40, 41, 42, 43, 44, 45, 46, 48, 51, 52, 53, 54, 55, 56, 58, 59, 60, 63, 64, 65, 81, 82, 83, 84, 364, 365, 366, 367, 369, 370, 371, 372, 373, 374, 375, 377, 378, 379, 380, 382, 383, 384, 594]
:large_blue_circle: be/src/exec/lookup_node.cpp 31 34 91.18% [56, 57, 58]
:large_blue_circle: be/src/column/chunk.cpp 28 30 93.33% [422, 455]
:large_blue_circle: be/src/runtime/lookup_stream_mgr.cpp 69 73 94.52% [108, 109, 110, 111]
:large_blue_circle: be/src/connector/hive_connector.cpp 25 26 96.15% [59]
:large_blue_circle: be/src/exec/pipeline/fetch_source_operator.cpp 26 27 96.30% [51]
:large_blue_circle: be/src/exec/pipeline/fetch_processor.cpp 320 324 98.77% [50, 213, 214, 392]
:large_blue_circle: be/src/runtime/lookup_stream_mgr.h 12 12 100.00% []
:large_blue_circle: be/src/exec/fetch_node.cpp 39 39 100.00% []
:large_blue_circle: be/src/runtime/exec_env.h 1 1 100.00% []
:large_blue_circle: be/src/runtime/exec_env.cpp 4 4 100.00% []
:large_blue_circle: be/src/formats/parquet/row_source_reader.cpp 6 6 100.00% []
:large_blue_circle: be/src/formats/parquet/file_reader.cpp 1 1 100.00% []
:large_blue_circle: be/src/exec/pipeline/query_context.h 2 2 100.00% []
:large_blue_circle: be/src/storage/range.h 1 1 100.00% []
:large_blue_circle: be/src/exec/exec_node.cpp 6 6 100.00% []
:large_blue_circle: be/src/util/internal_service_recoverable_stub.cpp 4 4 100.00% []
:large_blue_circle: be/src/exec/pipeline/scan/glm_manager.h 11 11 100.00% []
:large_blue_circle: be/src/exec/hdfs_scanner/hdfs_scanner.cpp 3 3 100.00% []
:large_blue_circle: be/src/runtime/runtime_state.h 3 3 100.00% []
:large_blue_circle: be/src/formats/parquet/scalar_column_reader.cpp 5 5 100.00% []
:large_blue_circle: be/src/exec/data_sink.cpp 8 8 100.00% []
:large_blue_circle: be/src/runtime/descriptors.h 13 13 100.00% []
:large_blue_circle: be/src/exec/pipeline/fetch_sink_operator.cpp 27 27 100.00% []
:large_blue_circle: be/src/exec/pipeline/query_context.cpp 2 2 100.00% []

github-actions[bot] avatar Dec 14 '25 14:12 github-actions[bot]