kedro-plugins
kedro-plugins copied to clipboard
Error saving `TensorFlowModelDataset` to Azure Blob Storage
Description
I get the following error when trying to save a TensorFlowModelDataset
to Azure Blob Storage. The issue occurs only in Azure Blob Storage, not in the local filesystem:
DatasetError: Cannot save versioned dataset 'model_prestamos.keras' to
'azureml/modelling/reprecios/data/06_models/run_2024-09-09' because a file with the same name already exists in the
directory. This is likely because versioning was enabled on a dataset already saved previously. Either remove
'model_prestamos.keras' from the directory or manually convert it into a versioned dataset by placing it in a
versioned directory (e.g. with default versioning format
'azureml/modelling/reprecios/data/06_models/run_2024-09-09/model_prestamos.keras/YYYY-MM-DDThh.mm.ss.sssZ/model_pre
stamos.keras').
The error indicates that versioning is enabled despite the versioned: False
setting in the dataset catalog. The file is never created or exists at any point in the Azure Blob Storage.
Context
Dataset Catalog Definition
_azure_base_path: 'abfs://azureml/modelling/reprecios/data'
"{namespace}.model":
type: ${_datasets.tensormodel}
credentials: azure_credentials
versioned: False
filepath: ${_azure_base_path}/06_models/run_${runtime_params:run_date}/model_{namespace}.keras
save_args:
overwrite: True
Steps to Reproduce
Edit: Included code below to reproduce. (credit to @merelcht, @astrojuanlu)
from kedro_datasets.tensorflow import TensorFlowModelDataset
import tensorflow as tf
import numpy as np
credentials = {
'account_name': '',
'account_key': ''
}
data_set_local = TensorFlowModelDataset(filepath="data/tensorflow_model.keras")
data_set = TensorFlowModelDataset(filepath="abfs://azureml/modelling/data/tensorflow_model.keras",
credentials=credentials)
inputs = tf.keras.Input(shape=(3,))
x = tf.keras.layers.Dense(4, activation=tf.nn.relu)(inputs)
outputs = tf.keras.layers.Dense(5, activation=tf.nn.softmax)(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)
input_data = np.array([[0.5, 0.3, 0.2]])
predictions = model.predict(input_data)
# This works
data_set_local.save(model)
# This Fails
try:
data_set.save(model)
except Exception as e:
print("🥲", e)
- Define a
TensorFlowModelDataset
in the Kedro catalog with the configuration above 👆 - Attempt to save a model using the
.save()
method targeting Azure Blob Storage.
I suspect this issue might be related to the following issue: kedro-plugins/issues/359, as the behavior appears to be similar to what is described there.
Environment
- Kedro=0.19.8
- kedro-datasets=4.1.0
- TensorFlow=2.17.0
- fsspec=2024.9.0
- adlfs=2024.7.0
- OS=Ubuntu 20.04
Full Error Traceback
Shorten for readability
INFO Saving data to prestamos.model (TensorFlowModelDataset)... [data_catalog.py]
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/kedro/io/core.py:727 in save │
│ │
│ 724 │ │ │ │ self.resolve_save_version() │
│ 725 │ │ │ ) # Make sure last save version is set │
│ 726 │ │ │ try: │
│ ❱ 727 │ │ │ │ super()._save_wrapper(save_func)(self, data) │
│ 728 │ │ │ except (FileNotFoundError, NotADirectoryError) as err: │
│ 729 │ │ │ │ # FileNotFoundError raised in Win, NotADirectoryError raised in Unix │
│ 730 │ │ │ │ _default_version = "YYYY-MM-DDThh.mm.ss.sssZ" │
│ │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/kedro/io/core.py:249 in save │
│ │
│ 246 │ │ │ │
│ 247 │ │ │ try: │
│ 248 │ │ │ │ self._logger.debug("Saving %s", str(self)) │
│ ❱ 249 │ │ │ │ save_func(self, data) │
│ 250 │ │ │ except (DatasetError, FileNotFoundError, NotADirectoryError): │
│ 251 │ │ │ │ raise │
│ 252 │ │ │ except Exception as exc: │
│ │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/kedro_datasets/tensorflow/tensorflow_model │
│ _dataset.py:176 in _save │
│ │
│ 173 │ │ │ │
│ 174 │ │ │ # Use fsspec to take from local tempfile directory/file and │
│ 175 │ │ │ # put in ArbitraryFileSystem │
│ ❱ 176 │ │ │ self._fs.copy(path, save_path) │
│ 177 │ │
│ 178 │ def _exists(self) -> bool: │
│ 179 │ │ try: │
│ │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/fsspec/asyn.py:118 in wrapper │
│ │
│ 115 │ @functools.wraps(func) │
│ 116 │ def wrapper(*args, **kwargs): │
│ 117 │ │ self = obj or args[0] │
│ ❱ 118 │ │ return sync(self.loop, func, *args, **kwargs) │
│ 119 │ │
│ 120 │ return wrapper │
│ 121 │
│ │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/fsspec/asyn.py:103 in sync │
│ │
│ 100 │ │ # suppress asyncio.TimeoutError, raise FSTimeoutError │
│ 101 │ │ raise FSTimeoutError from return_result │
│ 102 │ elif isinstance(return_result, BaseException): │
│ ❱ 103 │ │ raise return_result │
│ 104 │ else: │
│ 105 │ │ return return_result │
│ 106 │
│ │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/fsspec/asyn.py:56 in _runner │
│ │
│ 53 │ if timeout is not None: │
│ 54 │ │ coro = asyncio.wait_for(coro, timeout=timeout) │
│ 55 │ try: │
│ ❱ 56 │ │ result[0] = await coro │
│ 57 │ except Exception as ex: │
│ 58 │ │ result[0] = ex │
│ 59 │ finally: │
│ │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/fsspec/asyn.py:369 in _copy │
│ │
│ 366 │ │ │ paths2 = path2 │
│ 367 │ │ else: │
│ 368 │ │ │ source_is_str = isinstance(path1, str) │
│ ❱ 369 │ │ │ paths1 = await self._expand_path( │
│ 370 │ │ │ │ path1, maxdepth=maxdepth, recursive=recursive │
│ 371 │ │ │ ) │
│ 372 │ │ │ if source_is_str and (not recursive or maxdepth is not None): │
│ │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/adlfs/spec.py:1595 in _expand_path │
│ │
│ 1592 │ │ │ if not path.endswith("*"): │
│ 1593 │ │ │ │ path = f"{path.strip('/')}" │
│ 1594 │ │ if isinstance(path, str): │
│ ❱ 1595 │ │ │ out = await self._expand_path( │
│ 1596 │ │ │ │ [path], │
│ 1597 │ │ │ │ recursive, │
│ 1598 │ │ │ │ maxdepth, │
│ │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/adlfs/spec.py:1640 in _expand_path │
│ │
│ 1637 │ │ │ │ │ out.add(fullpath) │
│ 1638 │ │ │
│ 1639 │ │ if not out: │
│ ❱ 1640 │ │ │ raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path) │
│ 1641 │ │ return list(sorted(out)) │
│ 1642 │ │
│ 1643 │ async def _put_file( │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
FileNotFoundError: [Errno 2] No such file or directory:
['tmp/kedro_tensorflow_tmpqic1m6vw/tmp_tensorflow_model.keras']
The above exception was the direct cause of the following exception:
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/kedro/runner/runner.py:530 in │
│ _run_node_sequential │
│ │
│ 527 │ │
│ 528 │ for name, data in items: │
│ 529 │ │ hook_manager.hook.before_dataset_saved(dataset_name=name, data=data, node=node) │
│ ❱ 530 │ │ catalog.save(name, data) │
│ 531 │ │ hook_manager.hook.after_dataset_saved(dataset_name=name, data=data, node=node) │
│ 532 │ return node │
│ 533 │
│ │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/kedro/io/data_catalog.py:588 in save │
│ │
│ 585 │ │ │ extra={"markup": True}, │
│ 586 │ │ ) │
│ 587 │ │ │
│ ❱ 588 │ │ dataset.save(data) │
│ 589 │ │
│ 590 │ def exists(self, name: str) -> bool: │
│ 591 │ │ """Checks whether registered data set exists by calling its `exists()` │
│ │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/kedro/io/core.py:731 in save │
│ │
│ 728 │ │ │ except (FileNotFoundError, NotADirectoryError) as err: │
│ 729 │ │ │ │ # FileNotFoundError raised in Win, NotADirectoryError raised in Unix │
│ 730 │ │ │ │ _default_version = "YYYY-MM-DDThh.mm.ss.sssZ" │
│ ❱ 731 │ │ │ │ raise DatasetError( │
│ 732 │ │ │ │ │ f"Cannot save versioned dataset '{self._filepath.name}' to " │
│ 733 │ │ │ │ │ f"'{self._filepath.parent.as_posix()}' because a file with the same │
│ 734 │ │ │ │ │ f"name already exists in the directory. This is likely because " │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯