iceberg-rust icon indicating copy to clipboard operation
iceberg-rust copied to clipboard

Example: Materialized View registered in a glue catalog?

Open ForeverAngry opened this issue 9 months ago • 6 comments

Does this capability exist today, i saw the materialized view feature, but wasn't sure how i could register it in a glue catalog, or if that was possible.

ForeverAngry avatar Apr 08 '25 00:04 ForeverAngry

It wasn't possible to use it before but I know added some changes that should make it work. Have a look at this test and use the GlueCatalogList. However, I just added the GlueCatalogList to the repo and it isn't available as a release yet. You would need to wait a little.

The GlueCatalogList only contains one catalog which is your default Glue catalog. However for Materialized view to function I need a catalog_list. When creating the list, you have to define the name with which your glue catalog should be referenced in SQL. Currently it is also best to fully specify the table refereces as in: mycatalog.mynamespace.mytable. And generally Materialized Views are in Alpha stage.

JanKaul avatar Apr 10 '25 13:04 JanKaul

@JanKaul yeah actually, i noticed that pr you had for the glue changes and pulled it in a few days ago. I keep running into the issue of the Object Store not being recognized. I was trying to follow the sql lite example and modify it to use the Gkue Catalog amd the S3 builder, so my object store was being set to s3:// but I kept hitting the same error.

ForeverAngry avatar Apr 10 '25 14:04 ForeverAngry

You wouldn't have the code in a repo somewhere?

JanKaul avatar Apr 10 '25 14:04 JanKaul

You wouldn't have the code in a repo somewhere?

I don't, it's in an enterprise workspace, but I'll recreate the gist of the code, and send it.

ForeverAngry avatar Apr 14 '25 01:04 ForeverAngry

@JanKaul Okay, so i replicated the issue to match the code you had for the materialized view example, but with glue. Here is the error i had referenced earlier: thread 'main' panicked at catalogs/iceberg-glue-catalog/src/lib.rs:71:50: called Result::unwrap()on anErr value: NotSupported("Object store protocol"). The full stack trace is below.

// View on Glue Code

use datafusion::{arrow::array::Int64Array, prelude::SessionContext};
use datafusion_iceberg::catalog::catalog::IcebergCatalog;
use datafusion_iceberg::materialized_view::refresh_materialized_view;
use iceberg_glue_catalog::GlueCatalogList;
use iceberg_rust::catalog::CatalogList;
use iceberg_rust::materialized_view::MaterializedView;
use iceberg_rust::object_store::ObjectStoreBuilder;
use iceberg_rust::spec::partition::PartitionSpec;
use iceberg_rust::spec::view_metadata::{Version, ViewRepresentation};
use iceberg_rust::spec::{
    partition::{PartitionField, Transform},
    schema::Schema,
    types::{PrimitiveType, StructField, Type},
};
use iceberg_rust::table::Table;
use std::sync::Arc;


#[tokio::main]
pub(crate) async fn main() {
    let object_store = ObjectStoreBuilder::s3();
    let config = aws_config::load_from_env().await;
    let catalog_list = Arc::new(
        GlueCatalogList::new("iceberg", &config, object_store),
    );

    let catalog = catalog_list.catalog("iceberg")
        .expect("Failed to find catalog with name 'iceberg'. Ensure the catalog exists and is properly configured.");

    let schema = Schema::builder()
        .with_struct_field(StructField {
            id: 1,
            name: "id".to_string(),
            required: true,
            field_type: Type::Primitive(PrimitiveType::Long),
            doc: None,
        })
        .with_struct_field(StructField {
            id: 2,
            name: "customer_id".to_string(),
            required: true,
            field_type: Type::Primitive(PrimitiveType::Long),
            doc: None,
        })
        .with_struct_field(StructField {
            id: 3,
            name: "product_id".to_string(),
            required: true,
            field_type: Type::Primitive(PrimitiveType::Long),
            doc: None,
        })
        .with_struct_field(StructField {
            id: 4,
            name: "date".to_string(),
            required: true,
            field_type: Type::Primitive(PrimitiveType::Date),
            doc: None,
        })
        .with_struct_field(StructField {
            id: 5,
            name: "amount".to_string(),
            required: true,
            field_type: Type::Primitive(PrimitiveType::Int),
            doc: None,
        })
        .build()
        .unwrap();

    let partition_spec = PartitionSpec::builder()
        .with_partition_field(PartitionField::new(4, 1000, "day", Transform::Day))
        .build()
        .expect("Failed to create partition spec");

    Table::builder()
        .with_name("orders")
        .with_location("/test/orders")
        .with_schema(schema)
        .with_partition_spec(partition_spec)
        .build(&["test".to_owned()], catalog.clone())
        .await
        .expect("Failed to create table");

    let matview_schema = Schema::builder()
        .with_struct_field(StructField {
            id: 1,
            name: "product_id".to_string(),
            required: true,
            field_type: Type::Primitive(PrimitiveType::Long),
            doc: None,
        })
        .with_struct_field(StructField {
            id: 2,
            name: "amount".to_string(),
            required: true,
            field_type: Type::Primitive(PrimitiveType::Int),
            doc: None,
        })
        .build()
        .unwrap();

    let mut matview = MaterializedView::builder()
        .with_name("orders_view")
        .with_location("test/orders_view")
        .with_schema(matview_schema)
        .with_view_version(
            Version::builder()
                .with_representation(ViewRepresentation::sql(
                    "select product_id, amount from iceberg.test.orders where product_id < 3;",
                    None,
                ))
                .build()
                .unwrap(),
        )
        .build(&["test".to_owned()], catalog.clone())
        .await
        .expect("Failed to create materialized view");

    let total_matview_schema = Schema::builder()
        .with_struct_field(StructField {
            id: 1,
            name: "product_id".to_string(),
            required: true,
            field_type: Type::Primitive(PrimitiveType::Long),
            doc: None,
        })
        .with_struct_field(StructField {
            id: 2,
            name: "total".to_string(),
            required: true,
            field_type: Type::Primitive(PrimitiveType::Long),
            doc: None,
        })
        .build()
        .unwrap();

    let mut total_matview = MaterializedView::builder()
            .with_name("total_orders")
            .with_location("test/total_orders")
            .with_schema(total_matview_schema)
            .with_view_version(
                Version::builder()
                    .with_representation(ViewRepresentation::sql(
                        "select product_id, sum(amount) as total from iceberg.test.orders_view group by product_id;",
                        None,
                    ))
                    .build()
                    .unwrap(),
            )
            .build(&["test".to_owned()], catalog.clone())
            .await
            .expect("Failed to create materialized view");

    // Datafusion

    let datafusion_catalog = Arc::new(
        IcebergCatalog::new(catalog, None)
            .await
            .expect("Failed to create datafusion catalog"),
    );

    let ctx = SessionContext::new();

    ctx.register_catalog("iceberg", datafusion_catalog);

    ctx.sql(
        "INSERT INTO iceberg.test.orders (id, customer_id, product_id, date, amount) VALUES 
                (1, 1, 1, '2020-01-01', 1),
                (2, 2, 1, '2020-01-01', 1),
                (3, 3, 1, '2020-01-01', 3),
                (4, 1, 2, '2020-02-02', 1),
                (5, 1, 1, '2020-02-02', 2),
                (6, 3, 3, '2020-02-02', 3);",
    )
    .await
    .expect("Failed to create query plan for insert")
    .collect()
    .await
    .expect("Failed to insert values into table");

    refresh_materialized_view(&mut matview, catalog_list.clone(), None)
        .await
        .expect("Failed to refresh materialized view");

    let batches = ctx
        .sql("select product_id, sum(amount) from iceberg.test.orders_view group by product_id;")
        .await
        .expect("Failed to create plan for select")
        .collect()
        .await
        .expect("Failed to execute select query");

    for batch in batches {
        if batch.num_rows() != 0 {
            let (order_ids, amounts) = (
                batch
                    .column(0)
                    .as_any()
                    .downcast_ref::<Int64Array>()
                    .unwrap(),
                batch
                    .column(1)
                    .as_any()
                    .downcast_ref::<Int64Array>()
                    .unwrap(),
            );
            for (order_id, amount) in order_ids.iter().zip(amounts) {
                if order_id.unwrap() == 1 {
                    assert_eq!(amount.unwrap(), 7)
                } else if order_id.unwrap() == 2 {
                    assert_eq!(amount.unwrap(), 1)
                } else {
                    panic!("Unexpected order id")
                }
            }
        }
    }

    ctx.sql(
        "INSERT INTO iceberg.test.orders (id, customer_id, product_id, date, amount) VALUES 
                (7, 1, 3, '2020-01-03', 1),
                (8, 2, 1, '2020-01-03', 2),
                (9, 2, 2, '2020-01-03', 1);",
    )
    .await
    .expect("Failed to create query plan for insert")
    .collect()
    .await
    .expect("Failed to insert values into table");

    refresh_materialized_view(&mut matview, catalog_list.clone(), None)
        .await
        .expect("Failed to refresh materialized view");

    let batches = ctx
        .sql("select product_id, sum(amount) from iceberg.test.orders_view group by product_id;")
        .await
        .expect("Failed to create plan for select")
        .collect()
        .await
        .expect("Failed to execute select query");

    for batch in batches {
        if batch.num_rows() != 0 {
            let (order_ids, amounts) = (
                batch
                    .column(0)
                    .as_any()
                    .downcast_ref::<Int64Array>()
                    .unwrap(),
                batch
                    .column(1)
                    .as_any()
                    .downcast_ref::<Int64Array>()
                    .unwrap(),
            );
            for (order_id, amount) in order_ids.iter().zip(amounts) {
                if order_id.unwrap() == 1 {
                    assert_eq!(amount.unwrap(), 9)
                } else if order_id.unwrap() == 2 {
                    assert_eq!(amount.unwrap(), 2)
                } else {
                    panic!("Unexpected order id")
                }
            }
        }
    }

    refresh_materialized_view(&mut total_matview, catalog_list.clone(), None)
        .await
        .expect("Failed to refresh materialized view");

    let batches = ctx
        .sql("select product_id, total from iceberg.test.total_orders;")
        .await
        .expect("Failed to create plan for select")
        .collect()
        .await
        .expect("Failed to execute select query");

    for batch in batches {
        if batch.num_rows() != 0 {
            let (order_ids, amounts) = (
                batch
                    .column(0)
                    .as_any()
                    .downcast_ref::<Int64Array>()
                    .unwrap(),
                batch
                    .column(1)
                    .as_any()
                    .downcast_ref::<Int64Array>()
                    .unwrap(),
            );
            for (order_id, amount) in order_ids.iter().zip(amounts) {
                if order_id.unwrap() == 1 {
                    assert_eq!(amount.unwrap(), 9)
                } else if order_id.unwrap() == 2 {
                    assert_eq!(amount.unwrap(), 2)
                } else {
                    panic!("Unexpected order id")
                }
            }
        }
    }
}

// Stack Trace

bc@Bradleys-MacBook-Pro iceberg-rust % RUST_BACKTRACE=1 cargo run --bin glue-view
   Compiling glue-view v0.1.0 (/Users/bc/Desktop/LocalHub/iceberg-rust/glue-view)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.41s
     Running `target/debug/glue-view`

thread 'main' panicked at catalogs/iceberg-glue-catalog/src/lib.rs:71:50:
called `Result::unwrap()` on an `Err` value: NotSupported("Object store protocol")
stack backtrace:
   0: rust_begin_unwind
             at /rustc/05f9846f893b09a1be1fc8560e33fc3c815cfecb/library/std/src/panicking.rs:695:5
   1: core::panicking::panic_fmt
             at /rustc/05f9846f893b09a1be1fc8560e33fc3c815cfecb/library/core/src/panicking.rs:75:14
   2: core::result::unwrap_failed
             at /rustc/05f9846f893b09a1be1fc8560e33fc3c815cfecb/library/core/src/result.rs:1704:5
   3: core::result::Result<T,E>::unwrap
             at /Users/bc/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/result.rs:1109:23
   4: iceberg_glue_catalog::GlueCatalog::default_object_store
             at ./catalogs/iceberg-glue-catalog/src/lib.rs:71:18
   5: <iceberg_glue_catalog::GlueCatalog as iceberg_rust::catalog::Catalog>::create_table::{{closure}}
             at ./catalogs/iceberg-glue-catalog/src/lib.rs:307:28
   6: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /Users/bc/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/future/future.rs:124:9
   7: iceberg_rust::catalog::create::CreateTableBuilder::build::{{closure}}
             at ./iceberg-rust/src/catalog/create.rs:112:58
   8: glue_view::main::{{closure}}
             at ./glue-view/src/main.rs:80:10
   9: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /Users/bc/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/future/future.rs:124:9
  10: tokio::runtime::park::CachedParkThread::block_on::{{closure}}
             at /Users/bc/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.44.1/src/runtime/park.rs:284:60
  11: tokio::task::coop::with_budget
             at /Users/bc/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.44.1/src/task/coop/mod.rs:167:5
  12: tokio::task::coop::budget
             at /Users/bc/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.44.1/src/task/coop/mod.rs:133:5
  13: tokio::runtime::park::CachedParkThread::block_on
             at /Users/bc/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.44.1/src/runtime/park.rs:284:31
  14: tokio::runtime::context::blocking::BlockingRegionGuard::block_on
             at /Users/bc/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.44.1/src/runtime/context/blocking.rs:66:9
  15: tokio::runtime::scheduler::multi_thread::MultiThread::block_on::{{closure}}
             at /Users/bc/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.44.1/src/runtime/scheduler/multi_thread/mod.rs:87:13
  16: tokio::runtime::context::runtime::enter_runtime
             at /Users/bc/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.44.1/src/runtime/context/runtime.rs:65:16
  17: tokio::runtime::scheduler::multi_thread::MultiThread::block_on
             at /Users/bc/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.44.1/src/runtime/scheduler/multi_thread/mod.rs:86:9
  18: tokio::runtime::runtime::Runtime::block_on_inner
             at /Users/bc/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.44.1/src/runtime/runtime.rs:370:45
  19: tokio::runtime::runtime::Runtime::block_on
             at /Users/bc/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.44.1/src/runtime/runtime.rs:340:13
  20: glue_view::main
             at ./glue-view/src/main.rs:280:5
  21: core::ops::function::FnOnce::call_once
             at /Users/bc/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib

ForeverAngry avatar Apr 20 '25 21:04 ForeverAngry

Hey, sorry for the delay. I think you have to use:

    .with_location("s3://mybucket/test/total_orders")

with the s3 prefix for the MaterializedView builder.

JanKaul avatar Apr 28 '25 12:04 JanKaul