iceberg-python icon indicating copy to clipboard operation
iceberg-python copied to clipboard

Add anon property to fsspec adls file io config to ease usage of DefaultCredential pipeline

Open NikitaMatskevich opened this issue 2 months ago • 2 comments

Rationale for this change

We are using default credential pipeline to get access to Azure (more concretely, managed identities). We found out that fsspec library only allows it if we set anon=False and specify the account name.

Thus, the anon property is added to pyiceberg config of the file io.

Are these changes tested?

We've tested that this works with the following snippet:

import os
from fsspec import AbstractFileSystem
from pyiceberg.io.fsspec import FsspecFileIO
from pyiceberg.catalog.rest import RestCatalog
from typing import Any

ADLS_ANON = "adls.anon"
ADLS_CONNECTION_STRING = "adls.connection-string"
ADLS_ACCOUNT_NAME = "adls.account-name"
ADLS_ACCOUNT_KEY = "adls.account-key"
ADLS_SAS_TOKEN = "adls.sas-token"
ADLS_TENANT_ID = "adls.tenant-id"
ADLS_CLIENT_ID = "adls.client-id"
ADLS_CLIENT_SECRET = "adls.client-secret"
ADLS_ACCOUNT_HOST = "adls.account-host"

Properties = dict[str, Any]

def my_adls(properties: Properties) -> AbstractFileSystem:
    from adlfs import AzureBlobFileSystem

    for key, sas_token in {
        key.replace(f"{ADLS_SAS_TOKEN}.", ""): value for key, value in properties.items() if key.startswith(ADLS_SAS_TOKEN)
    }.items():
        if ADLS_ACCOUNT_NAME not in properties:
            properties[ADLS_ACCOUNT_NAME] = key.split(".")[0]
        if ADLS_SAS_TOKEN not in properties:
            properties[ADLS_SAS_TOKEN] = sas_token

    return AzureBlobFileSystem(
        connection_string=properties.get(ADLS_CONNECTION_STRING),
        anon=properties.get(ADLS_ANON),
        account_name=properties.get(ADLS_ACCOUNT_NAME),
        account_key=properties.get(ADLS_ACCOUNT_KEY),
        sas_token=properties.get(ADLS_SAS_TOKEN),
        tenant_id=properties.get(ADLS_TENANT_ID),
        client_id=properties.get(ADLS_CLIENT_ID),
        client_secret=properties.get(ADLS_CLIENT_SECRET),
        account_host=properties.get(ADLS_ACCOUNT_HOST),
    )

injected_file_io = FsspecFileIO(properties={ADLS_ANON: False, ADLS_ACCOUNT_NAME: "usagestorageprod"})
injected_file_io.get_fs = lambda scheme: my_adls(injected_file_io.properties)

CATALOG_URI = "https://lakehouse..."

catalog_config = {
    "uri": CATALOG_URI,
    "properties": {
        "io-impl": "pyiceberg.io.fsspec.FsspecFileIO",
    },
    ...
}

catalog = RestCatalog("lakehouse", **catalog_config)
catalog.file_io = injected_file_io

table = catalog.load_table("some_ns.some_table")
table.io = injected_file_io
table.scan(snapshot_id=xxx).count()

Are there any user-facing changes?

Zero breaking changes

NikitaMatskevich avatar Oct 25 '25 17:10 NikitaMatskevich

@NikitaMatskevich thanks for the PR! could you fix ci?

kevinjqliu avatar Oct 25 '25 22:10 kevinjqliu

gentle ping @NikitaMatskevich

kevinjqliu avatar Nov 03 '25 18:11 kevinjqliu