iceberg-python icon indicating copy to clipboard operation
iceberg-python copied to clipboard

Support scanning a specific branch of a table

Open malcolmbovey opened this issue 10 months ago • 5 comments

Feature Request / Improvement

As far as I can tell, PyIceberg does not directly support being able to scan a particular branch of an iceberg table. For example, there is no way to be able to run an equivalent of queries like

SELECT * FROM <namespace>.<table>.<branch>;

as documented in https://iceberg.apache.org/docs/1.8.0/branching/#schema-selection-with-branches-and-tags

I can see that PR #941 to add support to write to branches in Pyiceberg is well on its way to be being merged in, so this would be a useful complement.

Alternatively, perhaps I'm mistaken and there is already a way to do this, but it wasn't clear in the documentation

malcolmbovey avatar Feb 28 '25 17:02 malcolmbovey

I dont think we have support for reading directly from a branch name yet. On the read side, you can specify a specific snapshot_id, https://github.com/apache/iceberg-python/blob/f186d5876b96d8ba56b0ed59cf40d8bc2ced13e9/pyiceberg/table/init.py#L925

I think if you find the specific snapshot_id for the branch, you can use the scan API to read.

kevinjqliu avatar Mar 01 '25 17:03 kevinjqliu

Thanks, would you like me to draft a PR tp enable a branch parameter to be passed to table.scan ?

malcolmbovey avatar Mar 02 '25 13:03 malcolmbovey

Hey @malcolmbovey I think that would be a great addition 👍

Fokko avatar Mar 02 '25 19:03 Fokko

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Nov 16 '25 00:11 github-actions[bot]

I would love to see this feature. (Commenting to prevent closing due to staleness).

sdemjanenko avatar Nov 16 '25 02:11 sdemjanenko

In case it helps anyone in the meantime, here is a custom Class I am using to scan an Iceberg table with BRANCH or TAG names. In this example my table is stored on AWS Glue Catalog, and I'm only interested in scanning a named reference or the current snapshot.

from pyiceberg.catalog.glue import GlueCatalog
from pyiceberg.table import DataScan, Table
from pyiceberg.table.refs import SnapshotRefType

class IcebergTable:
    """
    This class provides methods to load and scan an Iceberg table using refs, filters, and snapshots.
    """

    def __init__(
        self,
        database_name: str,
        table_name: str,
        row_filter: Optional[str] = None,
        ref_name: Optional[str] = None,
        ref_type: Optional[str] = None,
    ):
        """
        Initialize the IcebergTable class
            :param database_name: The name of the AWS Glue Catalog Iceberg database.
            :param table_name: The name of the AWS Glue Catalog Iceberg table.
            :param row_filter (optional): The filter string to apply to the table scan.
            :param ref_name (optional): The reference name of the snapshot to use.
            :param ref_type (optional): The reference type (BRANCH or TAG) to use.
        """
        self.catalog: GlueCatalog = GlueCatalog(
            name=AWS_PARAMS["GLUE_CATALOG"],
            warehouse=AWS_PARAMS["WAREHOUSE"],
            **{"client.region": AWS_PARAMS["AWS_REGION"]},
        )
        self.database_name = database_name
        self.table_name = table_name
        self.table = self.load_table()
        self.row_filter = row_filter
        self.ref_name = ref_name
        self.ref_type = ref_type
        self.snapshot_id = self.get_snapshot_id()

    def get_current_snapshot_id(self) -> int:
        """
        Get the current snapshot ID from the Table object.
            :return: The current snapshot ID.
        """
        current_snapshot = self.table.current_snapshot()
        if current_snapshot:
            return current_snapshot.snapshot_id
        raise ValueError(
            "Current snapshot is None. Cannot get ID from an invalid snapshot."
        )

    def get_snapshot_id(self) -> int:
        """
        Retrieve the snapshot_id based on the ref_name and ref_type, or use get_current_snapshot_id.
            :return: The snapshot_id associated with the given ref_name and ref_type.
        """
        if not self.ref_name:
            return self.get_current_snapshot_id()
        table_refs = self.get_table_refs()
        if self.ref_name in table_refs:
            return table_refs[self.ref_name]["snapshot_id"]
        raise ValueError(
            f"Reference name '{self.ref_name}' does not exist for reference type '{self.ref_type}'."
        )

    def get_table_refs(self) -> dict[str, dict]:
        """
        Get a dictionary of existing snapshot references.
            :return: A dict of snapshot IDs and their corresponding reference types.
        """
        table_refs = self.table.refs()
        return {
            ref_name: {
                "snapshot_id": ref.snapshot_id,
                "snapshot_ref_type": ref.snapshot_ref_type,
            }
            for ref_name, ref in table_refs.items()
            if self.ref_type is None
            or ref.snapshot_ref_type == SnapshotRefType[self.ref_type]
        }

    def load_table(self) -> Table:
        """
        Load an Iceberg table
            :return: The Iceberg Table object.
        """
        return self.catalog.load_table(f"{self.database_name}.{self.table_name}")

    def scan_table(self) -> DataScan:
        """
        Scan the Iceberg table object using the row_filter and snapshot_id.
            :return: Return a Table DataScan object.
        """
        if self.row_filter:
            return self.table.scan(
                row_filter=self.row_filter, snapshot_id=self.snapshot_id
            )
        return self.table.scan(snapshot_id=self.snapshot_id)

Usage

# Create an Iceberg Table object using input properties 
# See: https://py.iceberg.apache.org/reference/pyiceberg/table/#pyiceberg.table.Table
iceberg_table = IcebergTable(
    database_name = "db_test",
    table_name = "tb_test",
    row_filter = "id in (1,2,3)",
    ref_name = "branch_abc",
    ref_type = "BRANCH",
)
# Create an Iceberg DataScan object for downstream use (.to_arrow(), .to_duckdb() etc.)
# See: https://py.iceberg.apache.org/reference/pyiceberg/table/#pyiceberg.table.DataScan
iceberg_scan = iceberg_table.scan_table()

adamaps avatar Nov 17 '25 18:11 adamaps