Support scanning a specific branch of a table
Feature Request / Improvement
As far as I can tell, PyIceberg does not directly support being able to scan a particular branch of an iceberg table. For example, there is no way to be able to run an equivalent of queries like
SELECT * FROM <namespace>.<table>.<branch>;
as documented in https://iceberg.apache.org/docs/1.8.0/branching/#schema-selection-with-branches-and-tags
I can see that PR #941 to add support to write to branches in Pyiceberg is well on its way to be being merged in, so this would be a useful complement.
Alternatively, perhaps I'm mistaken and there is already a way to do this, but it wasn't clear in the documentation
I dont think we have support for reading directly from a branch name yet. On the read side, you can specify a specific snapshot_id, https://github.com/apache/iceberg-python/blob/f186d5876b96d8ba56b0ed59cf40d8bc2ced13e9/pyiceberg/table/init.py#L925
I think if you find the specific snapshot_id for the branch, you can use the scan API to read.
Thanks, would you like me to draft a PR tp enable a branch parameter to be passed to table.scan ?
Hey @malcolmbovey I think that would be a great addition 👍
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.
I would love to see this feature. (Commenting to prevent closing due to staleness).
In case it helps anyone in the meantime, here is a custom Class I am using to scan an Iceberg table with BRANCH or TAG names. In this example my table is stored on AWS Glue Catalog, and I'm only interested in scanning a named reference or the current snapshot.
from pyiceberg.catalog.glue import GlueCatalog
from pyiceberg.table import DataScan, Table
from pyiceberg.table.refs import SnapshotRefType
class IcebergTable:
"""
This class provides methods to load and scan an Iceberg table using refs, filters, and snapshots.
"""
def __init__(
self,
database_name: str,
table_name: str,
row_filter: Optional[str] = None,
ref_name: Optional[str] = None,
ref_type: Optional[str] = None,
):
"""
Initialize the IcebergTable class
:param database_name: The name of the AWS Glue Catalog Iceberg database.
:param table_name: The name of the AWS Glue Catalog Iceberg table.
:param row_filter (optional): The filter string to apply to the table scan.
:param ref_name (optional): The reference name of the snapshot to use.
:param ref_type (optional): The reference type (BRANCH or TAG) to use.
"""
self.catalog: GlueCatalog = GlueCatalog(
name=AWS_PARAMS["GLUE_CATALOG"],
warehouse=AWS_PARAMS["WAREHOUSE"],
**{"client.region": AWS_PARAMS["AWS_REGION"]},
)
self.database_name = database_name
self.table_name = table_name
self.table = self.load_table()
self.row_filter = row_filter
self.ref_name = ref_name
self.ref_type = ref_type
self.snapshot_id = self.get_snapshot_id()
def get_current_snapshot_id(self) -> int:
"""
Get the current snapshot ID from the Table object.
:return: The current snapshot ID.
"""
current_snapshot = self.table.current_snapshot()
if current_snapshot:
return current_snapshot.snapshot_id
raise ValueError(
"Current snapshot is None. Cannot get ID from an invalid snapshot."
)
def get_snapshot_id(self) -> int:
"""
Retrieve the snapshot_id based on the ref_name and ref_type, or use get_current_snapshot_id.
:return: The snapshot_id associated with the given ref_name and ref_type.
"""
if not self.ref_name:
return self.get_current_snapshot_id()
table_refs = self.get_table_refs()
if self.ref_name in table_refs:
return table_refs[self.ref_name]["snapshot_id"]
raise ValueError(
f"Reference name '{self.ref_name}' does not exist for reference type '{self.ref_type}'."
)
def get_table_refs(self) -> dict[str, dict]:
"""
Get a dictionary of existing snapshot references.
:return: A dict of snapshot IDs and their corresponding reference types.
"""
table_refs = self.table.refs()
return {
ref_name: {
"snapshot_id": ref.snapshot_id,
"snapshot_ref_type": ref.snapshot_ref_type,
}
for ref_name, ref in table_refs.items()
if self.ref_type is None
or ref.snapshot_ref_type == SnapshotRefType[self.ref_type]
}
def load_table(self) -> Table:
"""
Load an Iceberg table
:return: The Iceberg Table object.
"""
return self.catalog.load_table(f"{self.database_name}.{self.table_name}")
def scan_table(self) -> DataScan:
"""
Scan the Iceberg table object using the row_filter and snapshot_id.
:return: Return a Table DataScan object.
"""
if self.row_filter:
return self.table.scan(
row_filter=self.row_filter, snapshot_id=self.snapshot_id
)
return self.table.scan(snapshot_id=self.snapshot_id)
Usage
# Create an Iceberg Table object using input properties
# See: https://py.iceberg.apache.org/reference/pyiceberg/table/#pyiceberg.table.Table
iceberg_table = IcebergTable(
database_name = "db_test",
table_name = "tb_test",
row_filter = "id in (1,2,3)",
ref_name = "branch_abc",
ref_type = "BRANCH",
)
# Create an Iceberg DataScan object for downstream use (.to_arrow(), .to_duckdb() etc.)
# See: https://py.iceberg.apache.org/reference/pyiceberg/table/#pyiceberg.table.DataScan
iceberg_scan = iceberg_table.scan_table()