[BUG]: Evaluator step in kubeflow_pipelines_orchestration example fails when using a google cloud artifact storage bucket.
System Information
ZenML version: 0.9.0 Install path: /home/victor/.local/lib/python3.8/site-packages/zenml Python version: 3.8.10 Platform information: {'os': 'linux', 'linux_distro': 'ubuntu', 'linux_distro_like': 'debian', 'linux_distro_version': '20.04'} Environment: native Integrations: ['gcp', 'kubeflow', 'mlflow', 'scipy', 'seldon', 'sklearn', 'tensorflow']
What happened?
The evaluator step of the kubeflow_pipelines_orchestration example fails when a google cloud artifact store is used:

I think this has something to do with
# Copy from artifact store to temporary directory
io_utils.copy_dir(self.artifact.uri, temp_dir.name)
Apparently the artifact paths aren't realized correctly by the materializer. See
FileNotFoundError:
b/ai-gilde-kubeflowpipelines-default/o/evaluator%2Foutput%2F12
The artifacts are stored correctly in the storage bucket:

Reproduction steps
Follow the kubeflow_pipelines_orchestration example. In particular the part Run the same pipeline on Kubeflow Pipelines deployed to GCP.
Relevant log output
[1;35mStep [0m[33mevaluator[1;35m has started.[0m
╭───────────────────── Traceback (most recent call last) ──────────────────────╮
│ /usr/local/lib/python3.8/dist-packages/tfx/orchestration/portable/launcher.p │
│ y:424 in _run_executor │
│ │
│ 421 │ │
│ 422 │ outputs_utils.make_output_dirs(execution_info.output_dict) │
│ 423 │ try: │
│ ❱ 424 │ executor_output = self._executor_operator.run_executor(execution │
│ 425 │ code = executor_output.execution_result.code │
│ 426 │ if code != 0: │
│ 427 │ │ result_message = executor_output.execution_result.result_messa │
│ │
│ /usr/local/lib/python3.8/dist-packages/tfx/orchestration/portable/python_exe │
│ cutor_operator.py:135 in run_executor │
│ │
│ 132 │ │ pipeline_info=execution_info.pipeline_info, │
│ 133 │ │ pipeline_run_id=execution_info.pipeline_run_id) │
│ 134 │ executor = self._executor_cls(context=context) │
│ ❱ 135 │ return run_with_executor(execution_info, executor) │
│ 136 │
│ │
│ /usr/local/lib/python3.8/dist-packages/tfx/orchestration/portable/python_exe │
│ cutor_operator.py:58 in run_with_executor │
│ │
│ 55 │ │ artifact.read() │
│ 56 │
│ 57 output_dict = copy.deepcopy(execution_info.output_dict) │
│ ❱ 58 result = executor.Do(execution_info.input_dict, output_dict, │
│ 59 │ │ │ │ │ execution_info.exec_properties) │
│ 60 if not result: │
│ 61 │ # If result is not returned from the Do function, then try to │
│ │
│ /usr/local/lib/python3.8/dist-packages/zenml/steps/utils.py:514 in Do │
│ │
│ 511 │ │ │ │ function_params[arg] = context │
│ 512 │ │ │ else: │
│ 513 │ │ │ │ # At this point, it has to be an artifact, so we resol │
│ ❱ 514 │ │ │ │ function_params[arg] = self.resolve_input_artifact( │
│ 515 │ │ │ │ │ input_dict[arg][0], arg_type │
│ 516 │ │ │ │ ) │
│ 517 │
│ │
│ /usr/local/lib/python3.8/dist-packages/zenml/steps/utils.py:399 in │
│ resolve_input_artifact │
│ │
│ 396 │ │ │ artifact.materializer │
│ 397 │ │ )(artifact) │
│ 398 │ │ # The materializer now returns a resolved input │
│ ❱ 399 │ │ return materializer.handle_input(data_type=data_type) │
│ 400 │ │
│ 401 │ def resolve_output_artifact( │
│ 402 │ │ self, param_name: str, artifact: BaseArtifact, data: Any │
│ │
│ /usr/local/lib/python3.8/dist-packages/zenml/integrations/tensorflow/materia │
│ lizers/keras_materializer.py:48 in handle_input │
│ │
│ 45 │ │ temp_dir = tempfile.TemporaryDirectory() │
│ 46 │ │ │
│ 47 │ │ # Copy from artifact store to temporary directory │
│ ❱ 48 │ │ io_utils.copy_dir(self.artifact.uri, temp_dir.name) │
│ 49 │ │ │
│ 50 │ │ # Load the model from the temporary directory │
│ 51 │ │ model = keras.models.load_model(temp_dir.name) │
│ │
│ /usr/local/lib/python3.8/dist-packages/zenml/utils/io_utils.py:169 in │
│ copy_dir │
│ │
│ 166 │ │ overwrite: Boolean. If false, function throws an error before │
│ 167 │ """ │
│ 168 │ for source_file in listdir(source_dir): │
│ ❱ 169 │ │ source_path = os.path.join(source_dir, convert_to_str(source_f │
│ 170 │ │ destination_path = os.path.join( │
│ 171 │ │ │ destination_dir, convert_to_str(source_file) │
│ 172 │ │ ) │
│ │
│ /usr/local/lib/python3.8/dist-packages/zenml/utils/io_utils.py:222 in │
│ convert_to_str │
│ │
│ 219 │ if isinstance(path, str): │
│ 220 │ │ return path │
│ 221 │ else: │
│ ❱ 222 │ │ return path.decode("utf-8") │
│ 223 │
│ 224 │
│ 225 def is_root(path: str) -> bool: │
╰──────────────────────────────────────────────────────────────────────────────╯
AttributeError: 'dict' object has no attribute 'decode'
During handling of the above exception, another exception occurred:
╭───────────────────── Traceback (most recent call last) ──────────────────────╮
│ /usr/local/lib/python3.8/dist-packages/zenml/artifact_stores/base_artifact_s │
│ tore.py:85 in inner_function │
│ │
│ 82 │ │ │ NotFoundError: If the function throws a FileNotFoundError. │
│ 83 │ │ """ │
│ 84 │ │ try: │
│ ❱ 85 │ │ │ return _func(*args, **kwargs) │
│ 86 │ │ except FileNotFoundError as e: │
│ 87 │ │ │ raise NotFoundError() from e │
│ 88 │
│ │
│ /usr/local/lib/python3.8/dist-packages/zenml/integrations/gcp/artifact_store │
│ s/gcp_artifact_store.py:178 in remove │
│ │
│ 175 │ │ Args: │
│ 176 │ │ │ path: The path of the file to remove. │
│ 177 │ │ """ │
│ ❱ 178 │ │ self.filesystem.rm_file(path=path) │
│ 179 │ │
│ 180 │ def rename( │
│ 181 │ │ self, src: PathType, dst: PathType, overwrite: bool = False │
│ │
│ /usr/local/lib/python3.8/dist-packages/fsspec/asyn.py:86 in wrapper │
│ │
│ 83 │ @functools.wraps(func) │
│ 84 │ def wrapper(*args, **kwargs): │
│ 85 │ │ self = obj or args[0] │
│ ❱ 86 │ │ return sync(self.loop, func, *args, **kwargs) │
│ 87 │ │
│ 88 │ return wrapper │
│ 89 │
│ │
│ /usr/local/lib/python3.8/dist-packages/fsspec/asyn.py:66 in sync │
│ │
│ 63 │ │ # suppress asyncio.TimeoutError, raise FSTimeoutError │
│ 64 │ │ raise FSTimeoutError from return_result │
│ 65 │ elif isinstance(return_result, BaseException): │
│ ❱ 66 │ │ raise return_result │
│ 67 │ else: │
│ 68 │ │ return return_result │
│ 69 │
│ │
│ /usr/local/lib/python3.8/dist-packages/fsspec/asyn.py:26 in _runner │
│ │
│ 23 │ if timeout is not None: │
│ 24 │ │ coro = asyncio.wait_for(coro, timeout=timeout) │
│ 25 │ try: │
│ ❱ 26 │ │ result[0] = await coro │
│ 27 │ except Exception as ex: │
│ 28 │ │ result[0] = ex │
│ 29 │ finally: │
│ │
│ /usr/local/lib/python3.8/dist-packages/gcsfs/core.py:903 in _rm_file │
│ │
│ 900 │ async def _rm_file(self, path, **kwargs): │
│ 901 │ │ bucket, key = self.split_path(path) │
│ 902 │ │ if key: │
│ ❱ 903 │ │ │ await self._call("DELETE", "b/{}/o/{}", bucket, key) │
│ 904 │ │ │ self.invalidate_cache(posixpath.dirname(self._strip_proto │
│ 905 │ │ else: │
│ 906 │ │ │ await self._rmdir(path) │
│ │
│ /usr/local/lib/python3.8/dist-packages/gcsfs/core.py:392 in _call │
│ │
│ 389 │ ): │
│ 390 │ │ logger.debug(f"{method.upper()}: {path}, {args}, {kwargs.get( │
│ 391 │ │ │
│ ❱ 392 │ │ status, headers, info, contents = await self._request( │
│ 393 │ │ │ method, path, *args, **kwargs │
│ 394 │ │ ) │
│ 395 │ │ if json_out: │
│ │
│ /usr/local/lib/python3.8/dist-packages/decorator.py:221 in fun │
│ │
│ 218 │ │ async def fun(*args, **kw): │
│ 219 │ │ │ if not kwsyntax: │
│ 220 │ │ │ │ args, kw = fix(args, kw, sig) │
│ ❱ 221 │ │ │ return await caller(func, *(extras + args), **kw) │
│ 222 │ elif isgeneratorfunction(caller): │
│ 223 │ │ def fun(*args, **kw): │
│ 224 │ │ │ if not kwsyntax: │
│ │
│ /usr/local/lib/python3.8/dist-packages/gcsfs/retry.py:115 in retry_request │
│ │
│ 112 │ │ try: │
│ 113 │ │ │ if retry > 0: │
│ 114 │ │ │ │ await asyncio.sleep(min(random.random() + 2 ** (retry │
│ ❱ 115 │ │ │ return await func(*args, **kwargs) │
│ 116 │ │ except ( │
│ 117 │ │ │ HttpError, │
│ 118 │ │ │ requests.exceptions.RequestException, │
│ │
│ /usr/local/lib/python3.8/dist-packages/gcsfs/core.py:384 in _request │
│ │
│ 381 │ │ │ info = r.request_info # for debug only │
│ 382 │ │ │ contents = await r.read() │
│ 383 │ │ │ │
│ ❱ 384 │ │ │ validate_response(status, contents, path, args) │
│ 385 │ │ │ return status, headers, info, contents │
│ 386 │ │
│ 387 │ async def _call( │
│ │
│ /usr/local/lib/python3.8/dist-packages/gcsfs/retry.py:84 in │
│ validate_response │
│ │
│ 81 │ │ │ │
│ 82 │ │ │ path = path.format(*[quote_plus(p) for p in args]) │
│ 83 │ │ if status == 404: │
│ ❱ 84 │ │ │ raise FileNotFoundError(path) │
│ 85 │ │ │
│ 86 │ │ error = None │
│ 87 │ │ if hasattr(content, "decode"): │
╰──────────────────────────────────────────────────────────────────────────────╯
FileNotFoundError:
b/ai-gilde-kubeflowpipelines-default/o/evaluator%2Foutput%2F12
The above exception was the direct cause of the following exception:
╭───────────────────── Traceback (most recent call last) ──────────────────────╮
│ /usr/lib/python3.8/runpy.py:194 in _run_module_as_main │
│ │
│ 191 │ main_globals = sys.modules["__main__"].__dict__ │
│ 192 │ if alter_argv: │
│ 193 │ │ sys.argv[0] = mod_spec.origin │
│ ❱ 194 │ return _run_code(code, main_globals, None, │
│ 195 │ │ │ │ │ "__main__", mod_spec) │
│ 196 │
│ 197 def run_module(mod_name, init_globals=None, │
│ │
│ /usr/lib/python3.8/runpy.py:87 in _run_code │
│ │
│ 84 │ │ │ │ │ __loader__ = loader, │
│ 85 │ │ │ │ │ __package__ = pkg_name, │
│ 86 │ │ │ │ │ __spec__ = mod_spec) │
│ ❱ 87 │ exec(code, run_globals) │
│ 88 │ return run_globals │
│ 89 │
│ 90 def _run_module_code(code, init_globals=None, │
│ │
│ /usr/local/lib/python3.8/dist-packages/zenml/entrypoints/step_entrypoint.py: │
│ 62 in <module> │
│ │
│ 59 │
│ 60 │
│ 61 if __name__ == "__main__": │
│ ❱ 62 │ main() │
│ 63 │
│ │
│ /usr/local/lib/python3.8/dist-packages/zenml/entrypoints/step_entrypoint.py: │
│ 58 in main │
│ │
│ 55 │ entrypoint_config = entrypoint_config_class(arguments=remaining_arg │
│ 56 │ │
│ 57 │ # Run the entrypoint configuration │
│ ❱ 58 │ entrypoint_config.run() │
│ 59 │
│ 60 │
│ 61 if __name__ == "__main__": │
│ │
│ /usr/local/lib/python3.8/dist-packages/zenml/entrypoints/step_entrypoint_con │
│ figuration.py:627 in run │
│ │
│ 624 │ │ # Execute the actual step code. │
│ 625 │ │ run_name = self.get_run_name(pipeline_name=pipeline_name) │
│ 626 │ │ orchestrator = Repository().active_stack.orchestrator │
│ ❱ 627 │ │ execution_info = orchestrator.run_step( │
│ 628 │ │ │ step=step, run_name=run_name, pb2_pipeline=pb2_pipeline │
│ 629 │ │ ) │
│ 630 │
│ │
│ /usr/local/lib/python3.8/dist-packages/zenml/orchestrators/base_orchestrator │
│ .py:320 in run_step │
│ │
│ 317 │ │ # This is where the step actually gets executed using the │
│ 318 │ │ # component_launcher │
│ 319 │ │ repo.active_stack.prepare_step_run() │
│ ❱ 320 │ │ execution_info = self._execute_step(component_launcher) │
│ 321 │ │ repo.active_stack.cleanup_step_run() │
│ 322 │ │ │
│ 323 │ │ return execution_info │
│ │
│ /usr/local/lib/python3.8/dist-packages/zenml/orchestrators/base_orchestrator │
│ .py:347 in _execute_step │
│ │
│ 344 │ │ start_time = time.time() │
│ 345 │ │ logger.info(f"Step `{pipeline_step_name}` has started.") │
│ 346 │ │ try: │
│ ❱ 347 │ │ │ execution_info = tfx_launcher.launch() │
│ 348 │ │ │ │
│ 349 │ │ │ if execution_info and get_cache_status(execution_info): │
│ 350 │ │ │ │ if execution_info.exec_properties: │
│ │
│ /usr/local/lib/python3.8/dist-packages/tfx/orchestration/portable/launcher.p │
│ y:549 in launch │
│ │
│ 546 │ │ self._executor_operator.with_execution_watcher( │
│ 547 │ │ │ executor_watcher.address) │
│ 548 │ │ executor_watcher.start() │
│ ❱ 549 │ │ executor_output = self._run_executor(execution_info) │
│ 550 │ except Exception as e: # pylint: disable=broad-except │
│ 551 │ │ execution_output = ( │
│ 552 │ │ │ e.executor_output if isinstance(e, _ExecutionFailedError) │
│ │
│ /usr/local/lib/python3.8/dist-packages/tfx/orchestration/portable/launcher.p │
│ y:435 in _run_executor │
│ │
│ 432 │ │ raise _ExecutionFailedError(err, executor_output) │
│ 433 │ return executor_output │
│ 434 │ except Exception: # pylint: disable=broad-except │
│ ❱ 435 │ outputs_utils.remove_output_dirs(execution_info.output_dict) │
│ 436 │ raise │
│ 437 │
│ 438 def _publish_successful_execution( │
│ │
│ /usr/local/lib/python3.8/dist-packages/tfx/orchestration/portable/outputs_ut │
│ ils.py:67 in remove_output_dirs │
│ │
│ 64 │ if fileio.isdir(artifact.uri): │
│ 65 │ │ fileio.rmtree(artifact.uri) │
│ 66 │ else: │
│ ❱ 67 │ │ fileio.remove(artifact.uri) │
│ 68 │
│ 69 │
│ 70 def clear_output_dirs(output_dict: Dict[str, List[types.Artifact]]) -> │
│ │
│ /usr/local/lib/python3.8/dist-packages/tfx/dsl/io/fileio.py:90 in remove │
│ │
│ 87 │
│ 88 def remove(path: PathType) -> None: │
│ 89 """Remove the file at the given path.""" │
│ ❱ 90 _get_filesystem(path).remove(path) │
│ 91 │
│ 92 │
│ 93 def rename(src: PathType, dst: PathType, overwrite: bool = False) -> N │
│ │
│ /usr/local/lib/python3.8/dist-packages/zenml/artifact_stores/base_artifact_s │
│ tore.py:87 in inner_function │
│ │
│ 84 │ │ try: │
│ 85 │ │ │ return _func(*args, **kwargs) │
│ 86 │ │ except FileNotFoundError as e: │
│ ❱ 87 │ │ │ raise NotFoundError() from e │
│ 88 │ │
│ 89 │ return inner_function │
│ 90 │
╰──────────────────────────────────────────────────────────────────────────────╯
NotFoundError
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
@VictorW96 thanks for the report we'll check asap
Hi @VictorW96, I've managed to get in some fixes for the GCP artifact store into our 0.10.0 release (out now!) which address the first exception raised in your stack trace:
AttributeError: 'dict' object has no attribute 'decode'
During handling of the above exception, another exception occurred:
The later errors regarding not finding a file only happen due to the first one, and are much more difficult to resolve as it's a bug outside of the ZenML codebase. However they shouldn't appear once the root cause responsible for failing the step (see the error above in your case) are fixed. Would be great if you could let me know if the example works for you with the latest ZenML version!
This seems to be not fixed. The new error is:
│ /usr/local/lib/python3.8/dist-packages/zenml/utils/io_utils.py:178 in │
│ copy_dir │
│ │
│ 175 │ │ │ │ # if the destination is a subdirectory of the source, │
│ 176 │ │ │ │ # copying it to avoid an infinite loop. │
│ 177 │ │ │ │ return │
│ ❱ 178 │ │ │ copy_dir(source_path, destination_path, overwrite) │
│ 179 │ │ else: │
│ 180 │ │ │ create_dir_recursive_if_not_exists( │
│ 181 │ │ │ │ str(Path(destination_path).parent) │
│ │
│ /usr/local/lib/python3.8/dist-packages/zenml/utils/io_utils.py:178 in │
│ copy_dir │
│ │
│ 175 │ │ │ │ # if the destination is a subdirectory of the source, │
│ 176 │ │ │ │ # copying it to avoid an infinite loop. │
│ 177 │ │ │ │ return │
│ ❱ 178 │ │ │ copy_dir(source_path, destination_path, overwrite) │
│ 179 │ │ else: │
│ 180 │ │ │ create_dir_recursive_if_not_exists( │
│ 181 │ │ │ │ str(Path(destination_path).parent) │
│ │
│ /usr/local/lib/python3.8/dist-packages/zenml/utils/io_utils.py:168 in │
│ copy_dir │
│ │
│ 165 │ │ destination_dir: Path to copy to. │
│ 166 │ │ overwrite: Boolean. If false, function throws an error before │
│ 167 │ """ │
│ ❱ 168 │ for source_file in listdir(source_dir): │
│ 169 │ │ source_path = os.path.join(source_dir, convert_to_str(source_f │
│ 170 │ │ destination_path = os.path.join( │
│ 171 │ │ │ destination_dir, convert_to_str(source_file) │
│ │
│ /usr/local/lib/python3.8/dist-packages/tfx/dsl/io/fileio.py:75 in listdir │
│ │
│ 72 │
│ 73 def listdir(path: PathType) -> List[PathType]: │
│ 74 """Return the list of files in a directory.""" │
│ ❱ 75 return _get_filesystem(path).listdir(path) │
│ 76 │
│ 77 │
│ 78 def makedirs(path: PathType) -> None: │
│ │
│ /usr/local/lib/python3.8/dist-packages/zenml/artifact_stores/base_artifact_s │
│ tore.py:85 in inner_function │
│ │
│ 82 │ │ │ NotFoundError: If the function throws a FileNotFoundError. │
│ 83 │ │ """ │
│ 84 │ │ try: │
│ ❱ 85 │ │ │ return _func(*args, **kwargs) │
│ 86 │ │ except FileNotFoundError as e: │
│ 87 │ │ │ raise NotFoundError() from e │
│ 88 │
│ │
│ /usr/local/lib/python3.8/dist-packages/zenml/integrations/gcp/artifact_store │
│ s/gcp_artifact_store.py:178 in listdir │
│ │
│ 175 │ │ │
│ 176 │ │ return [ │
│ 177 │ │ │ _extract_basename(dict_) │
│ ❱ 178 │ │ │ for dict_ in self.filesystem.listdir(path=path) │
│ 179 │ │ ] │
│ 180 │ │
│ 181 │ def makedirs(self, path: PathType) -> None: │
│ │
│ /usr/local/lib/python3.8/dist-packages/fsspec/spec.py:1236 in listdir │
│ │
│ 1233 │ │
│ 1234 │ def listdir(self, path, detail=True, **kwargs): │
│ 1235 │ │ """Alias of `AbstractFileSystem.ls`.""" │
│ ❱ 1236 │ │ return self.ls(path, detail=detail, **kwargs) │
│ 1237 │ │
│ 1238 │ def cp(self, path1, path2, **kwargs): │
│ 1239 │ │ """Alias of `AbstractFileSystem.copy`.""" │
│ │
│ /usr/local/lib/python3.8/dist-packages/fsspec/asyn.py:86 in wrapper │
│ │
│ 83 │ @functools.wraps(func) │
│ 84 │ def wrapper(*args, **kwargs): │
│ 85 │ │ self = obj or args[0] │
│ ❱ 86 │ │ return sync(self.loop, func, *args, **kwargs) │
│ 87 │ │
│ 88 │ return wrapper │
│ 89 │
│ │
│ /usr/local/lib/python3.8/dist-packages/fsspec/asyn.py:51 in sync │
│ │
│ 48 │ coro = func(*args, **kwargs) │
│ 49 │ result = [None] │
│ 50 │ event = threading.Event() │
│ ❱ 51 │ asyncio.run_coroutine_threadsafe(_runner(event, coro, result, time │
│ 52 │ while True: │
│ 53 │ │ # this loops allows thread to get interrupted │
│ 54 │ │ if event.wait(1): │
│ │
│ /usr/lib/python3.8/asyncio/tasks.py:918 in run_coroutine_threadsafe │
│ │
│ 915 │ """ │
│ 916 │ if not coroutines.iscoroutine(coro): │
│ 917 │ │ raise TypeError('A coroutine object is required') │
│ ❱ 918 │ future = concurrent.futures.Future() │
│ 919 │ │
│ 920 │ def callback(): │
│ 921 │ │ try: │
│ │
│ /usr/lib/python3.8/concurrent/futures/_base.py:318 in __init__ │
│ │
│ 315 │ │
│ 316 │ def __init__(self): │
│ 317 │ │ """Initializes the future. Should not be called by clients.""" │
│ ❱ 318 │ │ self._condition = threading.Condition() │
│ 319 │ │ self._state = PENDING │
│ 320 │ │ self._result = None │
│ 321 │ │ self._exception = None │
│ │
│ /usr/lib/python3.8/threading.py:224 in __init__ │
│ │
│ 221 │ │
│ 222 │ def __init__(self, lock=None): │
│ 223 │ │ if lock is None: │
│ ❱ 224 │ │ │ lock = RLock() │
│ 225 │ │ self._lock = lock │
│ 226 │ │ # Export the lock's acquire() and release() methods │
│ 227 │ │ self.acquire = lock.acquire │
│ │
│ /usr/lib/python3.8/threading.py:92 in RLock │
│ │
│ 89 │ """ │
│ 90 │ if _CRLock is None: │
│ 91 │ │ return _PyRLock(*args, **kwargs) │
│ ❱ 92 │ return _CRLock(*args, **kwargs) │
│ 93 │
│ 94 class _RLock: │
│ 95 │ """This class implements reentrant lock objects. │
╰──────────────────────────────────────────────────────────────────────────────╯
RecursionError: maximum recursion depth exceeded while calling a Python object
During handling of the above exception, another exception occurred:
I'm looking into this now @VictorW96, can you maybe paste the entire stack trace for a little more context?
@VictorW96 would appreciate some insights from you whether this is still failing?
Yes this is still failing. I have appended the whole evaluator kubeflow step log and the zenml console log. error-kubeflow-log.txt zenml-console-log.txt
Fixed by #998