Support Vended Credentials for Azure Data Lake Store
Feature Request / Improvement
Vended-Credentials for Azure Data Lake Store are supported by Java. For getTable / createTable endpoints, the catalog returns a "config" that looks like:
"config": {
"adls.sas-token.<storage-account-name>.dfs.core.windows.net": "sv=2023-11-03&st=2024-09-08T11%3A34%3A08Z&....(rest of SAS Token)"
}
This is currently not respected by Pyiceberg. Instead we get the error:
ValueError: unable to connect to account for Must provide either a connection_string or account_name with credentials!!
Full Traceback:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
File [/opt/conda/lib/python3.11/site-packages/adlfs/spec.py:515](http://localhost:8888/opt/conda/lib/python3.11/site-packages/adlfs/spec.py#line=514), in AzureBlobFileSystem.do_connect(self)
514 else:
--> 515 raise ValueError(
516 "Must provide either a connection_string or account_name with credentials!!"
517 )
519 except RuntimeError:
ValueError: Must provide either a connection_string or account_name with credentials!!
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
File [/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py:509](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py#line=508), in Transaction.append(self, df, snapshot_properties)
506 data_files = _dataframe_to_data_files(
507 table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
508 )
--> 509 for data_file in data_files:
510 append_files.append_data_file(data_file)
File [/opt/conda/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:2354](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py#line=2353), in _dataframe_to_data_files(table_metadata, df, io, write_uuid, counter)
2353 if table_metadata.spec().is_unpartitioned():
-> 2354 yield from write_file(
2355 io=io,
2356 table_metadata=table_metadata,
2357 tasks=iter([
2358 WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=task_schema)
2359 for batches in bin_pack_arrow_table(df, target_file_size)
2360 ]),
2361 )
2362 else:
File [/opt/conda/lib/python3.11/concurrent/futures/_base.py:619](http://localhost:8888/opt/conda/lib/python3.11/concurrent/futures/_base.py#line=618), in Executor.map.<locals>.result_iterator()
618 if timeout is None:
--> 619 yield _result_or_cancel(fs.pop())
620 else:
File [/opt/conda/lib/python3.11/concurrent/futures/_base.py:317](http://localhost:8888/opt/conda/lib/python3.11/concurrent/futures/_base.py#line=316), in _result_or_cancel(***failed resolving arguments***)
316 try:
--> 317 return fut.result(timeout)
318 finally:
File [/opt/conda/lib/python3.11/concurrent/futures/_base.py:456](http://localhost:8888/opt/conda/lib/python3.11/concurrent/futures/_base.py#line=455), in Future.result(self, timeout)
455 elif self._state == FINISHED:
--> 456 return self.__get_result()
457 else:
File [/opt/conda/lib/python3.11/concurrent/futures/_base.py:401](http://localhost:8888/opt/conda/lib/python3.11/concurrent/futures/_base.py#line=400), in Future.__get_result(self)
400 try:
--> 401 raise self._exception
402 finally:
403 # Break a reference cycle with the exception in self._exception
File [/opt/conda/lib/python3.11/concurrent/futures/thread.py:58](http://localhost:8888/opt/conda/lib/python3.11/concurrent/futures/thread.py#line=57), in _WorkItem.run(self)
57 try:
---> 58 result = self.fn(*self.args, **self.kwargs)
59 except BaseException as exc:
File [/opt/conda/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py:2173](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py#line=2172), in write_file.<locals>.write_parquet(task)
2172 file_path = f'{table_metadata.location}[/data/](http://localhost:8888/data/){task.generate_data_file_path("parquet")}'
-> 2173 fo = io.new_output(file_path)
2174 with fo.create(overwrite=True) as fos:
File [/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py:331](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py#line=330), in FsspecFileIO.new_output(self, location)
330 uri = urlparse(location)
--> 331 fs = self.get_fs(uri.scheme)
332 return FsspecOutputFile(location=location, fs=fs)
File [/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py:355](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py#line=354), in FsspecFileIO._get_fs(self, scheme)
354 raise ValueError(f"No registered filesystem for scheme: {scheme}")
--> 355 return self._scheme_to_fs[scheme](self.properties)
File [/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py:179](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py#line=178), in _adlfs(properties)
177 from adlfs import AzureBlobFileSystem
--> 179 return AzureBlobFileSystem(
180 connection_string=properties.get(ADLFS_CONNECTION_STRING),
181 account_name=properties.get(ADLFS_ACCOUNT_NAME),
182 account_key=properties.get(ADLFS_ACCOUNT_KEY),
183 sas_token=properties.get(ADLFS_SAS_TOKEN),
184 tenant_id=properties.get(ADLFS_TENANT_ID),
185 client_id=properties.get(ADLFS_CLIENT_ID),
186 client_secret=properties.get(ADLFS_ClIENT_SECRET),
187 )
File [/opt/conda/lib/python3.11/site-packages/fsspec/spec.py:80](http://localhost:8888/opt/conda/lib/python3.11/site-packages/fsspec/spec.py#line=79), in _Cached.__call__(cls, *args, **kwargs)
79 else:
---> 80 obj = super().__call__(*args, **kwargs)
81 # Setting _fs_token here causes some static linters to complain.
File [/opt/conda/lib/python3.11/site-packages/adlfs/spec.py:344](http://localhost:8888/opt/conda/lib/python3.11/site-packages/adlfs/spec.py#line=343), in AzureBlobFileSystem.__init__(self, account_name, account_key, connection_string, credential, sas_token, request_session, socket_timeout, blocksize, client_id, client_secret, tenant_id, anon, location_mode, loop, asynchronous, default_fill_cache, default_cache_type, version_aware, assume_container_exists, max_concurrency, timeout, connection_timeout, read_timeout, account_host, **kwargs)
339 (
340 self.credential,
341 self.sync_credential,
342 ) = self._get_default_azure_credential(**kwargs)
--> 344 self.do_connect()
345 weakref.finalize(self, sync, self.loop, close_service_client, self)
File [/opt/conda/lib/python3.11/site-packages/adlfs/spec.py:525](http://localhost:8888/opt/conda/lib/python3.11/site-packages/adlfs/spec.py#line=524), in AzureBlobFileSystem.do_connect(self)
524 except Exception as e:
--> 525 raise ValueError(f"unable to connect to account for {e}")
ValueError: unable to connect to account for Must provide either a connection_string or account_name with credentials!!
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
File [/opt/conda/lib/python3.11/site-packages/adlfs/spec.py:515](http://localhost:8888/opt/conda/lib/python3.11/site-packages/adlfs/spec.py#line=514), in AzureBlobFileSystem.do_connect(self)
514 else:
--> 515 raise ValueError(
516 "Must provide either a connection_string or account_name with credentials!!"
517 )
519 except RuntimeError:
ValueError: Must provide either a connection_string or account_name with credentials!!
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
Cell In[8], line 1
----> 1 table.append(pa_df)
File [/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py:1578](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py#line=1577), in Table.append(self, df, snapshot_properties)
1570 """
1571 Shorthand API for appending a PyArrow table to the table.
1572
(...)
1575 snapshot_properties: Custom properties to be added to the snapshot summary
1576 """
1577 with self.transaction() as tx:
-> 1578 tx.append(df=df, snapshot_properties=snapshot_properties)
File [/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py:503](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py#line=502), in Transaction.append(self, df, snapshot_properties)
500 update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties)
501 append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append
--> 503 with append_method() as append_files:
504 # skip writing data files if the dataframe is empty
505 if df.shape[0] > 0:
506 data_files = _dataframe_to_data_files(
507 table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
508 )
File [/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py:2094](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py#line=2093), in UpdateTableMetadata.__exit__(self, _, value, traceback)
2092 def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
2093 """Close and commit the change."""
-> 2094 self.commit()
File [/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py:2090](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py#line=2089), in UpdateTableMetadata.commit(self)
2089 def commit(self) -> None:
-> 2090 self._transaction._apply(*self._commit())
File [/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py:3220](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/table/__init__.py#line=3219), in _SnapshotProducer._commit(self)
3210 summary = self._summary(self.snapshot_properties)
3212 manifest_list_file_path = _generate_manifest_list_path(
3213 location=self._transaction.table_metadata.location,
3214 snapshot_id=self._snapshot_id,
3215 attempt=0,
3216 commit_uuid=self.commit_uuid,
3217 )
3218 with write_manifest_list(
3219 format_version=self._transaction.table_metadata.format_version,
-> 3220 output_file=self._io.new_output(manifest_list_file_path),
3221 snapshot_id=self._snapshot_id,
3222 parent_snapshot_id=self._parent_snapshot_id,
3223 sequence_number=next_sequence_number,
3224 ) as writer:
3225 writer.add_manifests(new_manifests)
3227 snapshot = Snapshot(
3228 snapshot_id=self._snapshot_id,
3229 parent_snapshot_id=self._parent_snapshot_id,
(...)
3233 schema_id=self._transaction.table_metadata.current_schema_id,
3234 )
File [/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py:331](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py#line=330), in FsspecFileIO.new_output(self, location)
322 """Get an FsspecOutputFile instance to write bytes to the file at the given location.
323
324 Args:
(...)
328 FsspecOutputFile: An FsspecOutputFile instance for the given location.
329 """
330 uri = urlparse(location)
--> 331 fs = self.get_fs(uri.scheme)
332 return FsspecOutputFile(location=location, fs=fs)
File [/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py:355](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py#line=354), in FsspecFileIO._get_fs(self, scheme)
353 if scheme not in self._scheme_to_fs:
354 raise ValueError(f"No registered filesystem for scheme: {scheme}")
--> 355 return self._scheme_to_fs[scheme](self.properties)
File [/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py:179](http://localhost:8888/opt/conda/lib/python3.11/site-packages/pyiceberg/io/fsspec.py#line=178), in _adlfs(properties)
176 def _adlfs(properties: Properties) -> AbstractFileSystem:
177 from adlfs import AzureBlobFileSystem
--> 179 return AzureBlobFileSystem(
180 connection_string=properties.get(ADLFS_CONNECTION_STRING),
181 account_name=properties.get(ADLFS_ACCOUNT_NAME),
182 account_key=properties.get(ADLFS_ACCOUNT_KEY),
183 sas_token=properties.get(ADLFS_SAS_TOKEN),
184 tenant_id=properties.get(ADLFS_TENANT_ID),
185 client_id=properties.get(ADLFS_CLIENT_ID),
186 client_secret=properties.get(ADLFS_ClIENT_SECRET),
187 )
File [/opt/conda/lib/python3.11/site-packages/fsspec/spec.py:80](http://localhost:8888/opt/conda/lib/python3.11/site-packages/fsspec/spec.py#line=79), in _Cached.__call__(cls, *args, **kwargs)
78 return cls._cache[token]
79 else:
---> 80 obj = super().__call__(*args, **kwargs)
81 # Setting _fs_token here causes some static linters to complain.
82 obj._fs_token_ = token
File [/opt/conda/lib/python3.11/site-packages/adlfs/spec.py:344](http://localhost:8888/opt/conda/lib/python3.11/site-packages/adlfs/spec.py#line=343), in AzureBlobFileSystem.__init__(self, account_name, account_key, connection_string, credential, sas_token, request_session, socket_timeout, blocksize, client_id, client_secret, tenant_id, anon, location_mode, loop, asynchronous, default_fill_cache, default_cache_type, version_aware, assume_container_exists, max_concurrency, timeout, connection_timeout, read_timeout, account_host, **kwargs)
333 if (
334 self.credential is None
335 and self.anon is False
336 and self.sas_token is None
337 and self.account_key is None
338 ):
339 (
340 self.credential,
341 self.sync_credential,
342 ) = self._get_default_azure_credential(**kwargs)
--> 344 self.do_connect()
345 weakref.finalize(self, sync, self.loop, close_service_client, self)
347 if self.credential is not None:
File [/opt/conda/lib/python3.11/site-packages/adlfs/spec.py:525](http://localhost:8888/opt/conda/lib/python3.11/site-packages/adlfs/spec.py#line=524), in AzureBlobFileSystem.do_connect(self)
522 self.do_connect()
524 except Exception as e:
--> 525 raise ValueError(f"unable to connect to account for {e}")
ValueError: unable to connect to account for Must provide either a connection_string or account_name with credentials!!
Hi @c-thiel thank you for raising this issue. I'm not an expert in Azure Data Lake Store, but I could help look into this issue together.
adls.sas-token.
.dfs.core.windows.net
It looks like the prefix for the secret name you posted is "adls" whereas the secret name we expect is "adlfs".
https://github.com/apache/iceberg-python/blob/d587e6724685744918ecf192724437182ad01abf/pyiceberg/io/init.py#L72
Could you confirm if this is the case? This could be a typo on your comment above, or on the REST Catalog server side, which should be fixed
@sungwy in my comment as well as in the catalog I am using "adls.sas-token" which is exactly what Java and Spark expect: https://github.com/apache/iceberg/blob/4873b4b7534de0bcda2e1e0366ffcf83943dc906/azure/src/main/java/org/apache/iceberg/azure/AzureProperties.java#L33 . We can easily send both from catalog side - but it would be great if we wouldn't have to.
Is there a reason for pyiceberg not using the same prefix as java?
The adlfs prefix is wrong, we already have a PR to fix the prefix in #961
The adlfs prefix is wrong, we already have a PR to fix the prefix in #961
Thanks for pointing that out @ndrluis . Let me do a pass through it and get it merged in to resolve this issue.
@sungwy in my comment as well as in the catalog I am using "adls.sas-token" which is exactly what Java and Spark expect: https://github.com/apache/iceberg/blob/4873b4b7534de0bcda2e1e0366ffcf83943dc906/azure/src/main/java/org/apache/iceberg/azure/AzureProperties.java#L33 . We can easily send both from catalog side - but it would be great if we wouldn't have to.
Is there a reason for pyiceberg not using the same prefix as java?
Thanks for pointing that out @c-thiel ! Like I mentioned, I'm not too familiar with the Azure Data Lake integration, but it looks like @ndrluis has the right solution ready for this issue 🙂
The PR has been merged into main - would be able to help confirm if the fix in main resolves this issue @c-thiel ?
@sungwy sorry for the late reply, I overlooked this.
Looking good now :) - Thanks!
@sungwy and @ndrluis #961 isn't a fix for this issue. There are two aspects, first the prefix adls.sas-token vs adlfs.sas-token which is addressed in #961 but it is more than that because the key being passed in isn't adls.sas-token it is adls.sas-token.<account_name>.dfs.core.windows.net which includes the new prefix, the account_name, and the account_url (adls.sas-token.<<account_url>>).
I haven't contributed to this project but here a working example merging in from the Java implementation and using the account-host to support the dfs endpoint
def _adls(properties: Properties) -> AbstractFileSystem:
from adlfs import AzureBlobFileSystem
for property_name in properties:
if property_name.startswith(ADLFS_PREFIX):
deprecation_message(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message=f"The property {property_name} is deprecated. Please use properties that start with adls.",
)
# TODO should be a function on pyiceberg.utils.properties
def properties_with_prefix(properties: dict, prefix: str) -> dict:
return {k[len(prefix):]: v for k, v in properties.items() if k.startswith(prefix)}
account_name=get_first_property_value(
properties,
ADLS_ACCOUNT_NAME,
ADLFS_ACCOUNT_NAME,
)
sas_token=get_first_property_value(
properties,
ADLS_SAS_TOKEN,
ADLFS_SAS_TOKEN,
)
adls_sas_tokens = properties_with_prefix(properties, f"{ADLS_SAS_TOKEN}.")
if len(adls_sas_tokens) == 1:
key, sas_token = next(iter(adls_sas_tokens.items())) # Get the single key-value pair
# TODO add constant?
if key.endswith('.windows.net'):
if account_name is None:
properties[ADLS_ACCOUNT_NAME] = key.split('.')[0]
# TODO add constant
properties['adls.account-host'] = key
if sas_token is None:
properties[ADLS_SAS_TOKEN] = sas_token
return AzureBlobFileSystem(
# TODO add constant
account_host=properties.get('adls.account-host'),
connection_string=get_first_property_value(
properties,
ADLS_CONNECTION_STRING,
ADLFS_CONNECTION_STRING,
),
account_name=get_first_property_value(
properties,
ADLS_ACCOUNT_NAME,
ADLFS_ACCOUNT_NAME,
),
account_key=get_first_property_value(
properties,
ADLS_ACCOUNT_KEY,
ADLFS_ACCOUNT_KEY,
),
sas_token=get_first_property_value(
properties,
ADLS_SAS_TOKEN,
ADLFS_SAS_TOKEN,
),
tenant_id=get_first_property_value(
properties,
ADLS_TENANT_ID,
ADLFS_TENANT_ID,
),
client_id=get_first_property_value(
properties,
ADLS_CLIENT_ID,
ADLFS_CLIENT_ID,
),
client_secret=get_first_property_value(
properties,
ADLS_ClIENT_SECRET,
ADLFS_ClIENT_SECRET,
),
)
spend too much time on it to find out it is a bug in pyiceberg :(
I created a custom FileIO fix as a temporary workaround and I've submitted Polaris #418
catalog = load_catalog(
**{
"type": "rest",
"header.X-Iceberg-Access-Delegation": "vended-credentials",
"uri": f"https://{account}.snowflakecomputing.com/polaris/api/catalog",
"credential": f"{principal_client_id}:{principal_secret}",
"warehouse": catalog_name,
"scope": role,
"token-refresh-enabled": "true",
"py-io-impl": "custom_fsspec.CustomFsspecFileIO",
}
)
from pyiceberg.io.fsspec import FsspecFileIO, _adls
from urllib.parse import urlparse
from pyiceberg.io import (ADLS_ACCOUNT_NAME,ADLS_SAS_TOKEN, ADLFS_ACCOUNT_NAME, ADLFS_SAS_TOKEN)
from pyiceberg.utils.properties import get_first_property_value
from fsspec import AbstractFileSystem
from pyiceberg.typedef import Properties
class CustomFsspecFileIO(FsspecFileIO):
def __init__(self, properties):
# Short term fix for https://github.com/apache/iceberg-python/issues/961 and https://github.com/apache/iceberg-python/issues/1146
base_location = properties.get('default-base-location')
if base_location and base_location.startswith('abfs'):
account_name = get_first_property_value(properties,ADLS_ACCOUNT_NAME,ADLFS_ACCOUNT_NAME)
sas_token = get_first_property_value(properties,ADLS_SAS_TOKEN,ADLFS_SAS_TOKEN)
if sas_token is None:
for key, value in properties.items():
key = key.replace('adlfs.', 'adls.')
if key.startswith(ADLS_SAS_TOKEN):
properties[ADLS_SAS_TOKEN] = value
if key.endswith('.windows.net'):
if account_name is None:
account_host = key.removeprefix(f"{ADLS_SAS_TOKEN}.")
account_name = account_host.split('.')[0]
properties[ADLS_ACCOUNT_NAME] = account_name
properties['adls.account-host'] = account_host
break # Exit loop after finding the first match
super().__init__(properties)
def _get_fs(self, scheme: str):
if scheme in ["abfs", "abfss", "wasb", "wasbs"]:
if scheme in ["wasb"]:
scheme = 'abfs'
if scheme in ["wasbs"]:
scheme = 'abfss'
adls_fs = _adls(self.properties)
return adls_fs
# If not adls proceed with the original behavior
return super()._get_fs(scheme)
def new_input(self, location: str):
# Replace wasb(s):// with adfs(s):// in the location
uri = urlparse(location)
if uri.scheme in ["wasb"]:
location = location.replace(f"{uri.scheme}://", "abfs://")
if uri.scheme in ["wasbs"]:
location = location.replace(f"{uri.scheme}://", "abfss://")
return super().new_input(location)
def new_output(self, location: str):
# Replace wasb(s):// with adfs:// in the location
uri = urlparse(location)
if uri.scheme in ["wasb"]:
location = location.replace(f"{uri.scheme}://", "abfs://")
if uri.scheme in ["wasbs"]:
location = location.replace(f"{uri.scheme}://", "abfss://")
return super().new_output(location)
def _adls(properties: Properties) -> AbstractFileSystem:
from adlfs import AzureBlobFileSystem
return AzureBlobFileSystem(
account_host = properties['adls.account-host'],
account_name=properties[ADLS_ACCOUNT_NAME],
sas_token=properties[ADLS_SAS_TOKEN]
)
Hey @sfc-gh-tbenroeck, thanks for uncovering this bug. Are you interested in creating a PR to fix this? It looks like you're almost there.