Parse KFP read_artifact data
I am trying to use the read_artifact method in kfp.Client().runs. However, I am not sure how to parse the returned data.
If I open the link provided by the UI, I am able to view the data as:
["[0, 1, 2, 3, 4, 5, 6, 7]", "[8, 9, 10, 11, 12, 13, 14, 15]", "[16, 17, 18, 19, 20, 21, 22, 23]", "[24, 25, 26, 27, 28, 29, 30, 31]", "[32, 33, 34, 35, 36, 37, 38, 39]", "[40, 41, 42, 43, 44, 45, 46, 47]", "[48, 49, 50, 51, 52, 53, 54, 55]", "[56, 57, 58, 59, 60, 61, 62, 63]", "[64, 65, 66, 67, 68, 69, 70, 71]", "[72, 73, 74]"]
However, the response returned by read_artifact is:
{'data': 'H4sIAAAAAAAA/+yRscrcQAwG/SjD1V8haSWt/SzHFQfpf/hzef/g2NVfJKQ4QsDTqNhhGe1+e76ey5sxM+vMX9PMvk4rj8XH9EhPL1vMfVou2LvDdn58fz0/F7PPj4/X77w/nX9d7j/hfrubcBFiiBQlWszHTdzuq9iE74oLD+FDeAqvw/AWPoWvwjcRJmK/L0SMw4kUUSJaxBSxitjEMDH8cMYeMMRIMUqMFmOKsYqxHU6aSBcZIvfaFFkiW+RZnKvITZSJclEhaojaVzubq0VNUauoTbSJdtEh+mzuFL2/RYueolfRm5gm5tk8Q8whZj5uj3/9kxcXFxd/x88AAAD//+E0btIACAAA'}, where the return type of artifact.data is a string, although it is supposed to be bytes. How do I get the original data back?
I have also tried using wget with the link returned by the UI, but when I tar -xzvf the file, it gives me this error:
tar: Error opening archive: Unrecognized archive format
Version: kfp 1.0.0, kubeflow 1.0.2
where the return type of artifact.data is a string, although it is supposed to be bytes. How do I get the original data back?
Have you tried decoding using Base64 and then unzipping/untarring?
I have also tried using wget with the link returned by the UI
Have you tried clicking on that link? I think it returns the actual (unarchived) artifact data.
How do you decode it when it is returned as a string? In the documentation, the data is defined to be "The bytes of the artifact content" although the return type is "str"
Also, I have clicked the link, but it just opens the link in the browser and displays the contents. I would like to download the contents instead (especially because I have some output artifacts that are pickled files, so displaying it in the UI would not suffice).
You can save the link as a local file I think
I tried that, but I received the error tar: Error opening archive: Unrecognized archive format when trying to unzip it.
How do you decode it when it is returned as a string?
Base64 encodes bytes to string. Base64 decodes string to bytes.
@kxiao-fn (or anyone else stumbling on this), here's some code that I wrote to do this:
#!/usr/bin/env python3
import json
import tarfile
from base64 import b64decode
from io import BytesIO
import kfp
def get_node_id(*, run_id: str, component_name: str, client: kfp.Client):
run = client.runs.get_run(run_id)
workflow = json.loads(run.pipeline_runtime.workflow_manifest)
nodes = workflow["status"]["nodes"]
for node_id, node_info in nodes.items():
if node_info["displayName"] == component_name:
return node_id
else:
raise RuntimeError(f"Unable to find node_id for Component '{component_name}'")
def get_artifact(*, run_id: str, node_id: str, artifact_name: str, client: kfp.Client):
artifact = client.runs.read_artifact(run_id, node_id, artifact_name)
# Artifacts are returned as base64-encoded .tar.gz strings
data = b64decode(artifact.data)
io_buffer = BytesIO()
io_buffer.write(data)
io_buffer.seek(0)
data = None
with tarfile.open(fileobj=io_buffer) as tar:
member_names = tar.getnames()
if len(member_names) == 1:
data = tar.extractfile(member_names[0]).read().decode('utf-8')
else:
# Is it possible for KFP artifacts to have multiple members?
data = {}
for member_name in member_names:
data[member_name] = tar.extractfile(member_name).read().decode('utf-8')
return data
if __name__ == "__main__":
run_id = "e498b0da-036e-4e81-84e9-6e9c6e64960b"
component_name = "my-component"
# For an output variable named "output_data"
artifact_name = "my-component-output_data"
client = kfp.Client()
node_id = get_node_id(run_id=run_id, component_name=component_name, client=client)
artifact = get_artifact(
run_id=run_id, node_id=node_id, artifact_name=artifact_name, client=client,
)
# Do something with artifact ...
Hi @sm-hawkfish that looks great! Are you interested contributing it?
/cc @neuromage
To make sure I understand the context correctly, we're talking about the Argo artifact instead of MLMD artifact here, right?
Yes, it's argo artifact
@Bobgy, I am glad you think it's useful! Yes, I would be interested in contributing it -- do you have any thoughts on where it should be added (and whether it should also be added to the CLI), or should I browse the SDK?
@sm-hawkfish Great!
I think kfp.Client is a good place, it will be up-to-you whether you think adding it to CLI is useful too. Of course you can implement in multiple PRs for sure.
Hi @Bobgy -- I wanted to follow up on this. I just got approved by my employer to contribute this repo, which is exciting news. I will work on submitting a Pull Request with some variation of the above code snippet sometime this week.
@sm-hawkfish That's awesome!
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
Hi @sm-hawkfish, do you still want to work on this? Don't worry if you no longer have bandwidth, we can make it clear that others are free to pick it up
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
I'm trying to use this piece of code made by @sm-hawkfish (thanks by the way!) but I get:
ApiException: (500) Reason: Internal Server Error
while running
kfp.Client().runs.read_artifact()
Although I have taken my artifact name from my compiled yaml pipeline:
templates:
name: load-dataset
outputs:
artifacts:
- {name: load-dataset-output_data, path: /tmp/outputs/output_data/data}
And kfp.Client().runs.get_run() works fine and contain my component_name ('load-dataset'). So I properly get the node_id to give to read_artifact().
Version: kfp 1.8.4, kubeflow 1.6
Hi all,
thanks a lot @sm-hawkfish , your function works for me. But i'm a little bit upset because, from my pipeline yaml (I use tfx==1.12.0), i see that for all my component, i have
"outputs":
{"artifacts":
[{"name":"mlpipeline-ui-metadata",
"path":"/mlpipeline-ui-metadata.json"}]
}"
so, when i get the node id, i have to provide artifact_name="mlpipeline-ui-metadata":
node_id = get_node_id(run_id=run_id, component_name='evaluator', client=client)
artifact = get_artifact(run_id=run_info.id, node_id=node_id, artifact_name="mlpipeline-ui-metadata", client=client)
The surprising fact is that artifact contains a json file with paths to the artifacts somwhere (whatever the component), not the artifact itself, like
# Execution properties:
**eval\_config**: {
"metrics\_specs": [
{
"thresholds": {
"accuracy": {
"change\_threshold": {
"absolute": -1e-10,
"direction": "HIGHER\_IS\_BETTER"
},
"value\_threshold": {
"lower\_bound": 0.0
}
}
}
}
],
"model\_specs": [
{
"signature\_name": "eval"
}
],
"slicing\_specs": [
{}
]
}
**fairness\_indicator\_thresholds**: null
**example\_splits**: null
# Inputs:
## model
**Type**: Model
**Artifact: None**
**Properties**:
**uri**: gs://my_bucket/tfx/0853cfbe-fced-4384-bb7e-53645b492652/Trainer/model/5983
**id**: 7768
**span**: None
....
## blessing
**Type**: ModelBlessing
**Artifact: None**
**Properties**:
**uri**: gs://my_bucket/tfx/0853cfbe-fced-4384-bb7e-53645b492652/Evaluator/blessing/5985
**id**: 0
**span**: None
**type_id**: 0
**type_name**: ModelBlessing
**state**: None
**split_names**: None
**producer_component**: None
Is there something i'm missing? am i mixing argo artifact with mlmd artifact? Thanks a lot
Hello, wanted to add a solution based on @sm-hawkfish's code. This additional function pulls all component outputs and preforms type casting based on template spec.
Feel free to give feedback if
- Some of the assumptions in the
NOTE:comments are faulty. - If there is a better way to do this.
It does feel like I am pulling data from many sources, so maybe there is a simpler way this should be done. Lmk!
Tested with kfp==1.8.18
Example output:
>>> from my_file import get_run_outputs
>>> get_run_outputs(client, '0ce9722a-813b-4bc7-b3a1-6ebef8c0d629')
{
'simple-returns-ljc9h-1842119181': {'a': 5,
'b': 'Hello There!',
'c': {'key': 'value'}},
'simple-returns-ljc9h-2173387764': {'Output': 'Hello World!'},
'simple-returns-ljc9h-2948094594': {'Output': 2},
'simple-returns-ljc9h-3744801681': {'Output': 1.0},
'simple-returns-ljc9h-4036141853': {'Output': {'a': 4, 'b': 'Hello Friend!'}},
'simple-returns-ljc9h-4142110750': {'Output': 3},
'simple-returns-ljc9h-694853977': {'Output': True}
}
Code:
import json
import tarfile
from io import BytesIO
from base64 import b64decode
from kfp import Client
KFP_TYPE_MAP = {
"Integer": int,
"Float": float,
"Boolean": bool,
"String": str,
"JsonObject": json.loads
}
def get_artifact(
client: Client,
run_id: str,
node_id: str,
artifact_name: str,
) -> dict:
"""
Source: https://github.com/kubeflow/pipelines/issues/4327#issuecomment-687255001
"""
artifact = client.runs.read_artifact(
run_id,
node_id,
artifact_name
)
data = b64decode(artifact.data)
buffer = BytesIO()
buffer.write(data)
buffer.seek(0)
with tarfile.open(fileobj=buffer) as tar:
member_names = tar.getnames()
data = {}
for name in member_names:
data[name] = tar.extractfile(name).read().decode("utf-8")
return data
def get_run_outputs(
client: Client,
run_id: str,
normalize_output_names: bool = True,
) -> dict:
run_data = client.get_run(run_id)
run_manifest = json.loads(run_data.pipeline_runtime.workflow_manifest)
all_output_data = {}
for node in run_manifest["status"]["nodes"].values():
# NOTE: Using type of node to find actual components (filtering out for-loops, subgraphs, etc)
if node["type"] != "Pod":
continue
# Locate the associated template
template = None
for t in run_manifest["spec"]["templates"]:
if t["name"] == node["templateName"]:
template = t
assert template is not None, "Template existence assumption broken"
# Pull the type information from the spec
component_spec = json.loads(
template["metadata"]["annotations"]["pipelines.kubeflow.org/component_spec"]
)
# NOTE: We assume that orders match here (not 100% sure if that is valid)
outputs = template["outputs"]["artifacts"]
spec_outputs = component_spec["outputs"]
type_info = {}
for o, so in zip(outputs, spec_outputs):
type_info[o["name"]] = so["type"]
# Loop through artifacts and pull data then cast
compontent_output_data = {}
for artifact in node["outputs"]["artifacts"]:
# Skip any artifact which is not an output (main-logs)
if artifact["name"] not in type_info:
continue
output_data = get_artifact(
client=client,
run_id=run_id,
node_id=node["id"],
artifact_name=artifact["name"]
)
output_data = output_data["data"]
# Cast types
t = KFP_TYPE_MAP[type_info[artifact["name"]]]
compontent_output_data[artifact["name"]] = t(output_data)
if normalize_output_names:
compontent_output_data = {
k.replace(f"{node['templateName']}-", ""):v
for k, v in compontent_output_data.items()
}
all_output_data[node["id"]] = compontent_output_data
return all_output_data
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.