deephaven-core
deephaven-core copied to clipboard
feat: support refreshing Iceberg tables
Add two methods of refreshing tables:
- Manual refreshing - user specifies which snapshot to load and the engine will parse the snapshot to add/remove Iceberg data files as needed and notify downstream tables of the changes
- Auto refreshing - at regular intervals (user configurable) the engine will query Iceberg for the latest snapshot and then parse and load
Example code:
Java automatic and manually refreshing tables
import io.deephaven.iceberg.util.*;
import org.apache.iceberg.catalog.*;
adapter = IcebergToolsS3.createS3Rest(
"minio-iceberg",
"http://rest:8181",
"s3a://warehouse/wh",
"us-east-1",
"admin",
"password",
"http://minio:9000");
//////////////////////////////////////////////////////////////////////
import io.deephaven.extensions.s3.*;
s3_instructions = S3Instructions.builder()
.regionName("us-east-1")
.credentials(Credentials.basic("admin", "password"))
.endpointOverride("http://minio:9000")
.build()
import io.deephaven.iceberg.util.IcebergUpdateMode;
// Automatic refreshing every 1 second
iceberg_instructions = IcebergInstructions.builder()
.dataInstructions(s3_instructions)
.updateMode(IcebergUpdateMode.autoRefreshing(1_000L))
.build()
// Automatic refreshing (default 60 seconds)
iceberg_instructions = IcebergInstructions.builder()
.dataInstructions(s3_instructions)
.updateMode(IcebergUpdateMode.AUTO_REFRESHING)
.build()
// Load the table and monitor changes
sales_multi = adapter.readTable(
"sales.sales_multi",
iceberg_instructions)
//////////////////////////////////////////////////////////////////////
// Manual refreshing
iceberg_instructions = IcebergInstructions.builder()
.dataInstructions(s3_instructions)
.updateMode(IcebergUpdateMode.MANUAL_REFRESHING)
.build()
// Load a table with a specific snapshot
sales_multi = adapter.readTable(
"sales.sales_multi",
5120804857276751995,
iceberg_instructions)
// Update the table to a specific snapshot
sales_multi.update(848129305390678414)
// Update to the latest snapshot
sales_multi.update()
Python automatic and manually refreshing tables
from deephaven.experimental import s3, iceberg
local_adapter = iceberg.adapter_s3_rest(
name="minio-iceberg",
catalog_uri="http://rest:8181",
warehouse_location="s3a://warehouse/wh",
region_name="us-east-1",
access_key_id="admin",
secret_access_key="password",
end_point_override="http://minio:9000");
#################################################
s3_instructions = s3.S3Instructions(
region_name="us-east-1",
access_key_id="admin",
secret_access_key="password",
endpoint_override="http://minio:9000"
)
# Auto-refresh every 1000 ms
iceberg_instructions = iceberg.IcebergInstructions(
data_instructions=s3_instructions,
update_mode=iceberg.IcebergUpdateMode.auto_refreshing(1000))
sales_multi = local_adapter.read_table(table_identifier="sales.sales_multi", instructions=iceberg_instructions)
#################################################
# Manual refresh the table
iceberg_instructions = iceberg.IcebergInstructions(
data_instructions=s3_instructions,
update_mode=iceberg.IcebergUpdateMode.MANUAL_REFRESHING)
sales_multi = local_adapter.read_table(
table_identifier="sales.sales_multi",
snapshot_id=5120804857276751995,
instructions=iceberg_instructions)
sales_multi.update(848129305390678414)
sales_multi.update(3019545135163225470)
sales_multi.update()
RegionedColumnSourceManager should increment reference count to any TableLocation is adds.
It should decrement reference count to any location it removes at the end of the cycle when it processed the removed (UpdateCommitter).
It should also decrement the reference count to any not-yet-removed location in destroy() (which it doesn't currently override, but should. Be sure to call super.destroy().
Labels indicate documentation is required. Issues for documentation have been opened:
Community: https://github.com/deephaven/deephaven-docs-community/issues/328