pipelines icon indicating copy to clipboard operation
pipelines copied to clipboard

Parse KFP read_artifact data

Open kxiao-fn opened this issue 5 years ago • 21 comments

I am trying to use the read_artifact method in kfp.Client().runs. However, I am not sure how to parse the returned data.

If I open the link provided by the UI, I am able to view the data as: ["[0, 1, 2, 3, 4, 5, 6, 7]", "[8, 9, 10, 11, 12, 13, 14, 15]", "[16, 17, 18, 19, 20, 21, 22, 23]", "[24, 25, 26, 27, 28, 29, 30, 31]", "[32, 33, 34, 35, 36, 37, 38, 39]", "[40, 41, 42, 43, 44, 45, 46, 47]", "[48, 49, 50, 51, 52, 53, 54, 55]", "[56, 57, 58, 59, 60, 61, 62, 63]", "[64, 65, 66, 67, 68, 69, 70, 71]", "[72, 73, 74]"]

However, the response returned by read_artifact is: {'data': 'H4sIAAAAAAAA/+yRscrcQAwG/SjD1V8haSWt/SzHFQfpf/hzef/g2NVfJKQ4QsDTqNhhGe1+e76ey5sxM+vMX9PMvk4rj8XH9EhPL1vMfVou2LvDdn58fz0/F7PPj4/X77w/nX9d7j/hfrubcBFiiBQlWszHTdzuq9iE74oLD+FDeAqvw/AWPoWvwjcRJmK/L0SMw4kUUSJaxBSxitjEMDH8cMYeMMRIMUqMFmOKsYqxHU6aSBcZIvfaFFkiW+RZnKvITZSJclEhaojaVzubq0VNUauoTbSJdtEh+mzuFL2/RYueolfRm5gm5tk8Q8whZj5uj3/9kxcXFxd/x88AAAD//+E0btIACAAA'}, where the return type of artifact.data is a string, although it is supposed to be bytes. How do I get the original data back?

I have also tried using wget with the link returned by the UI, but when I tar -xzvf the file, it gives me this error:

tar: Error opening archive: Unrecognized archive format

Version: kfp 1.0.0, kubeflow 1.0.2

kxiao-fn avatar Aug 05 '20 20:08 kxiao-fn

where the return type of artifact.data is a string, although it is supposed to be bytes. How do I get the original data back?

Have you tried decoding using Base64 and then unzipping/untarring?

I have also tried using wget with the link returned by the UI

Have you tried clicking on that link? I think it returns the actual (unarchived) artifact data.

Ark-kun avatar Aug 05 '20 22:08 Ark-kun

How do you decode it when it is returned as a string? In the documentation, the data is defined to be "The bytes of the artifact content" although the return type is "str"

Also, I have clicked the link, but it just opens the link in the browser and displays the contents. I would like to download the contents instead (especially because I have some output artifacts that are pickled files, so displaying it in the UI would not suffice).

kxiao-fn avatar Aug 05 '20 23:08 kxiao-fn

You can save the link as a local file I think

Bobgy avatar Aug 06 '20 13:08 Bobgy

I tried that, but I received the error tar: Error opening archive: Unrecognized archive format when trying to unzip it.

kxiao-fn avatar Aug 06 '20 23:08 kxiao-fn

How do you decode it when it is returned as a string?

Base64 encodes bytes to string. Base64 decodes string to bytes.

Ark-kun avatar Aug 11 '20 21:08 Ark-kun

@kxiao-fn (or anyone else stumbling on this), here's some code that I wrote to do this:

#!/usr/bin/env python3

import json
import tarfile
from base64 import b64decode
from io import BytesIO

import kfp


def get_node_id(*, run_id: str, component_name: str, client: kfp.Client):
    run = client.runs.get_run(run_id)
    workflow = json.loads(run.pipeline_runtime.workflow_manifest)
    nodes = workflow["status"]["nodes"]
    for node_id, node_info in nodes.items():
        if node_info["displayName"] == component_name:
            return node_id
    else:
        raise RuntimeError(f"Unable to find node_id for Component '{component_name}'")


def get_artifact(*, run_id: str, node_id: str, artifact_name: str, client: kfp.Client):
    artifact = client.runs.read_artifact(run_id, node_id, artifact_name)
    # Artifacts are returned as base64-encoded .tar.gz strings
    data = b64decode(artifact.data)
    io_buffer = BytesIO()
    io_buffer.write(data)
    io_buffer.seek(0)
    data = None
    with tarfile.open(fileobj=io_buffer) as tar:
        member_names = tar.getnames()
        if len(member_names) == 1:
            data = tar.extractfile(member_names[0]).read().decode('utf-8')
        else:
            # Is it possible for KFP artifacts to have multiple members?
            data = {}
            for member_name in member_names:
                data[member_name] = tar.extractfile(member_name).read().decode('utf-8')
    return data


if __name__ == "__main__":
    run_id = "e498b0da-036e-4e81-84e9-6e9c6e64960b"
    component_name = "my-component"
    # For an output variable named "output_data"
    artifact_name = "my-component-output_data"

    client = kfp.Client()
    node_id = get_node_id(run_id=run_id, component_name=component_name, client=client)
    artifact = get_artifact(
        run_id=run_id, node_id=node_id, artifact_name=artifact_name, client=client,
    )
    # Do something with artifact ...

sm-hawkfish avatar Sep 04 '20 16:09 sm-hawkfish

Hi @sm-hawkfish that looks great! Are you interested contributing it?

Bobgy avatar Sep 06 '20 01:09 Bobgy

/cc @neuromage

To make sure I understand the context correctly, we're talking about the Argo artifact instead of MLMD artifact here, right?

numerology avatar Sep 06 '20 01:09 numerology

Yes, it's argo artifact

Bobgy avatar Sep 06 '20 03:09 Bobgy

@Bobgy, I am glad you think it's useful! Yes, I would be interested in contributing it -- do you have any thoughts on where it should be added (and whether it should also be added to the CLI), or should I browse the SDK?

sm-hawkfish avatar Sep 06 '20 16:09 sm-hawkfish

@sm-hawkfish Great!

I think kfp.Client is a good place, it will be up-to-you whether you think adding it to CLI is useful too. Of course you can implement in multiple PRs for sure.

Bobgy avatar Sep 10 '20 04:09 Bobgy

Hi @Bobgy -- I wanted to follow up on this. I just got approved by my employer to contribute this repo, which is exciting news. I will work on submitting a Pull Request with some variation of the above code snippet sometime this week.

sm-hawkfish avatar Oct 12 '20 01:10 sm-hawkfish

@sm-hawkfish That's awesome!

Bobgy avatar Oct 12 '20 09:10 Bobgy

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] avatar Jan 10 '21 13:01 stale[bot]

Hi @sm-hawkfish, do you still want to work on this? Don't worry if you no longer have bandwidth, we can make it clear that others are free to pick it up

Bobgy avatar Mar 30 '21 02:03 Bobgy

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] avatar Jul 08 '21 02:07 stale[bot]

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] avatar Mar 03 '22 02:03 stale[bot]

I'm trying to use this piece of code made by @sm-hawkfish (thanks by the way!) but I get: ApiException: (500) Reason: Internal Server Error while running kfp.Client().runs.read_artifact()

Although I have taken my artifact name from my compiled yaml pipeline:

templates:
   name: load-dataset
   outputs:
      artifacts:
      - {name: load-dataset-output_data, path: /tmp/outputs/output_data/data}

And kfp.Client().runs.get_run() works fine and contain my component_name ('load-dataset'). So I properly get the node_id to give to read_artifact().

Version: kfp 1.8.4, kubeflow 1.6

rguso avatar Oct 26 '22 16:10 rguso

Hi all,

thanks a lot @sm-hawkfish , your function works for me. But i'm a little bit upset because, from my pipeline yaml (I use tfx==1.12.0), i see that for all my component, i have

 "outputs":
  {"artifacts":
    [{"name":"mlpipeline-ui-metadata",
      "path":"/mlpipeline-ui-metadata.json"}]
  }"

so, when i get the node id, i have to provide artifact_name="mlpipeline-ui-metadata":

node_id = get_node_id(run_id=run_id, component_name='evaluator', client=client)
artifact = get_artifact(run_id=run_info.id, node_id=node_id, artifact_name="mlpipeline-ui-metadata", client=client)

The surprising fact is that artifact contains a json file with paths to the artifacts somwhere (whatever the component), not the artifact itself, like

# Execution properties:
**eval\_config**: {
  "metrics\_specs": [
    {
      "thresholds": {
        "accuracy": {
          "change\_threshold": {
            "absolute": -1e-10,
            "direction": "HIGHER\_IS\_BETTER"
          },
          "value\_threshold": {
            "lower\_bound": 0.0
          }
        }
      }
    }
  ],
  "model\_specs": [
    {
      "signature\_name": "eval"
    }
  ],
  "slicing\_specs": [
    {}
  ]
}

**fairness\_indicator\_thresholds**: null

**example\_splits**: null

# Inputs:
## model

**Type**: Model

**Artifact: None**

**Properties**:

**uri**: gs://my_bucket/tfx/0853cfbe-fced-4384-bb7e-53645b492652/Trainer/model/5983

**id**: 7768

**span**: None

....

## blessing

**Type**: ModelBlessing

**Artifact: None**

**Properties**:

**uri**: gs://my_bucket/tfx/0853cfbe-fced-4384-bb7e-53645b492652/Evaluator/blessing/5985

**id**: 0

**span**: None

**type_id**: 0

**type_name**: ModelBlessing

**state**: None

**split_names**: None

**producer_component**: None

Is there something i'm missing? am i mixing argo artifact with mlmd artifact? Thanks a lot

ykacer avatar Mar 03 '23 15:03 ykacer

Hello, wanted to add a solution based on @sm-hawkfish's code. This additional function pulls all component outputs and preforms type casting based on template spec.

Feel free to give feedback if

  1. Some of the assumptions in the NOTE: comments are faulty.
  2. If there is a better way to do this.

It does feel like I am pulling data from many sources, so maybe there is a simpler way this should be done. Lmk!

Tested with kfp==1.8.18 Example output:

>>> from my_file import get_run_outputs
>>> get_run_outputs(client, '0ce9722a-813b-4bc7-b3a1-6ebef8c0d629')
{
'simple-returns-ljc9h-1842119181': {'a': 5,
  'b': 'Hello There!',
  'c': {'key': 'value'}},
 'simple-returns-ljc9h-2173387764': {'Output': 'Hello World!'},
 'simple-returns-ljc9h-2948094594': {'Output': 2},
 'simple-returns-ljc9h-3744801681': {'Output': 1.0},
 'simple-returns-ljc9h-4036141853': {'Output': {'a': 4, 'b': 'Hello Friend!'}},
 'simple-returns-ljc9h-4142110750': {'Output': 3},
 'simple-returns-ljc9h-694853977': {'Output': True}
}

Code:

import json
import tarfile
from io import BytesIO
from base64 import b64decode

from kfp import Client

KFP_TYPE_MAP = {
    "Integer": int,
    "Float": float,
    "Boolean": bool,
    "String": str,
    "JsonObject": json.loads
}

def get_artifact(
    client: Client,
    run_id: str,
    node_id: str,
    artifact_name: str,
) -> dict:
    """
    Source: https://github.com/kubeflow/pipelines/issues/4327#issuecomment-687255001
    """
    artifact = client.runs.read_artifact(
        run_id,
        node_id,
        artifact_name
    )
    data = b64decode(artifact.data)

    buffer = BytesIO()
    buffer.write(data)
    buffer.seek(0)
    with tarfile.open(fileobj=buffer) as tar:
        member_names = tar.getnames()
        data = {}
        for name in member_names:
            data[name] = tar.extractfile(name).read().decode("utf-8")

    return data

def get_run_outputs(
    client: Client,
    run_id: str,
    normalize_output_names: bool = True,
) -> dict:
    run_data = client.get_run(run_id)
    run_manifest = json.loads(run_data.pipeline_runtime.workflow_manifest)

    all_output_data = {}
    for node in run_manifest["status"]["nodes"].values():
        # NOTE: Using type of node to find actual components (filtering out for-loops, subgraphs, etc)
        if node["type"] != "Pod":
            continue

        # Locate the associated template
        template = None
        for t in run_manifest["spec"]["templates"]:
            if t["name"] == node["templateName"]:
                template = t
        assert template is not None, "Template existence assumption broken"

        # Pull the type information from the spec
        component_spec = json.loads(
            template["metadata"]["annotations"]["pipelines.kubeflow.org/component_spec"]
        )
        # NOTE: We assume that orders match here (not 100% sure if that is valid)
        outputs = template["outputs"]["artifacts"]
        spec_outputs = component_spec["outputs"]

        type_info = {}
        for o, so in zip(outputs, spec_outputs):
            type_info[o["name"]] = so["type"]

        # Loop through artifacts and pull data then cast
        compontent_output_data = {}
        for artifact in node["outputs"]["artifacts"]:
            # Skip any artifact which is not an output (main-logs)
            if artifact["name"] not in type_info:
                continue

            output_data = get_artifact(
                client=client,
                run_id=run_id,
                node_id=node["id"],
                artifact_name=artifact["name"]
            )
            output_data = output_data["data"]

            # Cast types
            t = KFP_TYPE_MAP[type_info[artifact["name"]]]
            compontent_output_data[artifact["name"]] = t(output_data)

        if normalize_output_names:
            compontent_output_data = {
                k.replace(f"{node['templateName']}-", ""):v
                for k, v in compontent_output_data.items()
            }

        all_output_data[node["id"]] = compontent_output_data

    return all_output_data

CarterFendley avatar Mar 05 '24 01:03 CarterFendley

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Jun 25 '24 07:06 github-actions[bot]

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

github-actions[bot] avatar Jul 17 '24 07:07 github-actions[bot]