deephaven-core icon indicating copy to clipboard operation
deephaven-core copied to clipboard

feat: support refreshing Iceberg tables

Open lbooker42 opened this issue 1 year ago • 1 comments

Add two methods of refreshing tables:

  • Manual refreshing - user specifies which snapshot to load and the engine will parse the snapshot to add/remove Iceberg data files as needed and notify downstream tables of the changes
  • Auto refreshing - at regular intervals (user configurable) the engine will query Iceberg for the latest snapshot and then parse and load

Example code:

Java automatic and manually refreshing tables

import io.deephaven.iceberg.util.*;
import org.apache.iceberg.catalog.*;

adapter = IcebergToolsS3.createS3Rest(
        "minio-iceberg",
        "http://rest:8181",
        "s3a://warehouse/wh",
        "us-east-1",
        "admin",
        "password",
        "http://minio:9000");

//////////////////////////////////////////////////////////////////////

import io.deephaven.extensions.s3.*;

s3_instructions = S3Instructions.builder()
    .regionName("us-east-1")
    .credentials(Credentials.basic("admin", "password"))
    .endpointOverride("http://minio:9000")
    .build()

import io.deephaven.iceberg.util.IcebergUpdateMode;

// Automatic refreshing every 1 second 
iceberg_instructions = IcebergInstructions.builder()
    .dataInstructions(s3_instructions)
    .updateMode(IcebergUpdateMode.autoRefreshing(1_000L))
    .build()

// Automatic refreshing (default 60 seconds)
iceberg_instructions = IcebergInstructions.builder()
    .dataInstructions(s3_instructions)
    .updateMode(IcebergUpdateMode.AUTO_REFRESHING)
    .build()

// Load the table and monitor changes
sales_multi = adapter.readTable(
        "sales.sales_multi",
        iceberg_instructions)

//////////////////////////////////////////////////////////////////////

// Manual refreshing
iceberg_instructions = IcebergInstructions.builder()
    .dataInstructions(s3_instructions)
    .updateMode(IcebergUpdateMode.MANUAL_REFRESHING)
    .build()

// Load a table with a specific snapshot
sales_multi = adapter.readTable(
        "sales.sales_multi",
        5120804857276751995,
        iceberg_instructions)

// Update the table to a specific snapshot
sales_multi.update(848129305390678414)

// Update to the latest snapshot
sales_multi.update()

Python automatic and manually refreshing tables

from deephaven.experimental import s3, iceberg

local_adapter = iceberg.adapter_s3_rest(
        name="minio-iceberg",
        catalog_uri="http://rest:8181",
        warehouse_location="s3a://warehouse/wh",
        region_name="us-east-1",
        access_key_id="admin",
        secret_access_key="password",
        end_point_override="http://minio:9000");

#################################################

s3_instructions = s3.S3Instructions(
        region_name="us-east-1",
        access_key_id="admin",
        secret_access_key="password",
        endpoint_override="http://minio:9000"
        )

# Auto-refresh every 1000 ms
iceberg_instructions = iceberg.IcebergInstructions(
        data_instructions=s3_instructions,
        update_mode=iceberg.IcebergUpdateMode.auto_refreshing(1000))

sales_multi = local_adapter.read_table(table_identifier="sales.sales_multi", instructions=iceberg_instructions)

#################################################

# Manual refresh the table
iceberg_instructions = iceberg.IcebergInstructions(
        data_instructions=s3_instructions,
        update_mode=iceberg.IcebergUpdateMode.MANUAL_REFRESHING)

sales_multi = local_adapter.read_table(
    table_identifier="sales.sales_multi",
    snapshot_id=5120804857276751995,
    instructions=iceberg_instructions)

sales_multi.update(848129305390678414)
sales_multi.update(3019545135163225470)
sales_multi.update()

lbooker42 avatar Jul 02 '24 15:07 lbooker42

RegionedColumnSourceManager should increment reference count to any TableLocation is adds. It should decrement reference count to any location it removes at the end of the cycle when it processed the removed (UpdateCommitter). It should also decrement the reference count to any not-yet-removed location in destroy() (which it doesn't currently override, but should. Be sure to call super.destroy().

rcaudy avatar Sep 04 '24 23:09 rcaudy

Labels indicate documentation is required. Issues for documentation have been opened:

Community: https://github.com/deephaven/deephaven-docs-community/issues/328

deephaven-internal avatar Oct 22 '24 22:10 deephaven-internal