Views, Spark: Add support for Materialized Views; Integrate with Spark SQL
Spec
This patch adds support for materialized views in Iceberg and integrates the implementation with Spark SQL. It reuses the current spec of Iceberg views and tables by leveraging table properties to capture materialized view metadata. Those properties can be added to the Iceberg spec to formalize materialized view support.
Below is a summary of all metadata properties introduced or utilized by this patch, classified based on whether they are associated with a table or a view, along with their purposes:
Properties on a View:
-
iceberg.materialized.view:- Type: View property
- Purpose: This property is used to mark whether a view is a materialized view. If set to
true, the view is treated as a materialized view. This helps in differentiating between virtual and materialized views within the catalog and dictates specific handling and validation logic for materialized views.
-
iceberg.materialized.view.storage.table:- Type: View property
- Purpose: Specifies the identifier of the storage table associated with the materialized view. This property is used for linking a materialized view with its corresponding storage table, enabling data management and query execution based on the stored data freshness.
Properties on a Table:
-
iceberg.base.snapshot.[UUID]:- Type: Table property
- Purpose: These properties store the snapshot IDs of the base tables at the time the materialized view's data was last updated. Each property is prefixed with
base.snapshot.followed by the UUID of the base table. They are used to track whether the materialized view's data is up to date with the base tables by comparing these snapshot IDs with the current snapshot IDs of the base tables. If all the base tables' current snapshot IDs match the ones stored in these properties, the materialized view's data is considered fresh.
-
iceberg.materialized.view.version:- Type: Table property
- Purpose: This property tracks the parent view version ID when the storage table is created (or refreshed). The table is usable only when the view version ID property matches the current parent view version ID.
Spark SQL
This patch introduces support for materialized views in the Spark module by adding support for Spark SQL CREATE MATERIALIZED VIEW and adding materialized view handling for the DROP VIEW DDL command. When a CREATE MATERIALIZED VIEW command is executed, the patch interprets the command to create a new materialized view, which involves not only registering the view's metadata (including marking it as a materialized view with the appropriate properties) but also setting up a corresponding storage table to hold the materialized data and setting the base table current snapshot IDs (at creation time). The storage table identifier is passed by a new clause STORED AS '...'. If no STORED AS clause is specified, a default storage table identifier is assigned. When a DROP VIEW command is issued for a materialized view, the patch ensures that both the metadata for the materialized view and its associated storage table are properly removed from the catalog. Support for REFRESH MATERIALIZED VIEW is left as a future enhancement.
Spark Catalog
This patch enhances the SparkCatalog to intelligently decide whether to return the view text metadata for a materialized view or the data from its associated storage table based on the freshness of the materialized view. Within the loadTable method, the patch first checks if the requested table corresponds to a materialized view by loading the view from the Iceberg catalog. If the identified view is marked as a materialized view (using the iceberg.materialized.view property), the patch then assesses its freshness. If it is fresh, the loadTable method proceeds to load and return the storage table associated with the materialized view, allowing users to query the pre-computed data directly. However, if the materialized view is stale, the method simply returns to allow SparkCatalog's loadView to run. In turn, loadView returns the metadata for the virtual view itself, triggering the usual Spark view logic that computes the result set based on the current state of the base tables.
Notes
- This patch intentionally avoids introducing new Iceberg or engine object APIs. The intention is to start a discussion on whether such APIs are required, and the best objects to model them. There is a number of trade-offs based on each choice.
- The
InMemoryCataloghas been extended to use a testLocalFileIOdue to an existing gap in a pureInMemoryCatalog(withInMemoryFileIO), with working with data files (which are required by the storage table). The extension of theInMemoryCatalogto useLocalFileIOended up promoting a couple of methods topublic, but the intention is again to start a discussion about the best way to address the current gap.
However, if the materialized view is stale, the method simply returns to allow SparkCatalog's loadView to run. In turn, loadView returns the metadata for the virtual view itself, triggering the usual Spark view logic that computes the result set based on the current state of the base tables.
1/ was wondering if auto-refresh of MV on staleness detection should be an opt-in feature ? 2/ Any ideas / plans for incremental refresh ?
These are very good questions. To me looks like if there is an external process that guarantees the freshness, then the current implementation still holds. Manual REFRESH will boil down to no-op, and isFresh will always return true.
For (2): We have not discussed incremental refresh plans in the Iceberg community, but there is some relevant work here. You can review some of the test cases here.
For (2): We have not discussed incremental refresh plans in the Iceberg community, but there is some relevant work here. You can review some of the test cases here.
@wmoustafa, Read this today, was wondering if there is something we can utilize from CDC (considering iceberg has support for that) perspective ? how expensive the refreshes of a PB size tables are and what is the ideal frequency of updates in this model, if you can share some datapoints ? rewrite to get incremental refresh by computing deltas between the snapshots and then joining it with other deltas and having union of those does seems user-friendly though
@wmoustafa, Read this today, was wondering if there is something we can utilize from CDC (considering iceberg has support for that) perspective ? how expensive the refreshes of a PB size tables are and what is the ideal frequency of updates in this model, if you can share some datapoints ? rewrite to get incremental refresh by computing deltas between the snapshots and then joining it with other deltas and having union of those does seems user-friendly though
It really depends on the query and the size of the delta and whole table etc. There is an extension of that work that is currently taking place to get an idea about the cost of some basic queries (e.g., a few joins/aggregations + filters & projections), and coming up with a reasonable cost model (including choosing to not perform incremental at all if incremental is deemed more expensive).
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.