hera
hera copied to clipboard
JSON parsing for inline scripts needs custom processing in certain situations (fan-out/in)
Pre-bug-report checklist
1. This bug can be reproduced using pure Argo YAML
- [ ] Yes 👉 Please report a bug to the Argo Workflows GitHub 👈
- [x] No
If yes, it is more likely to be an Argo bug unrelated to Hera. Please double check before submitting an issue to Hera.
2. This bug occurs in Hera when...
- [ ] exporting to YAML
- [ ] submitting to Argo
- [ ] running on Argo with the Hera runner
- [x] other:
Handling JSON serialised objects generated by Argo Worfklows using the json.loads
method included at the beginning of all Scripts
Bug report
Describe the bug A clear and concise description of what the bug is:
When using nested fan-in patterns on Script
tasks the json parsing code introduced to handle Argo parameters can fail, see:
try: message = json.loads(r'''{{inputs.parameters.in_param}}''')
except: message = r'''{{inputs.parameters.in_param}}'''
Unfortunately this results in a broken behaviour if one uses a nested fan-out and fan-in pattern. In these cases the Argo YAML generated isn’t aware of the multiple levels of serialisation, so one ends up with a top level JSON object that contains JSON that has been stringified.
For example consider a nested fan-out fan-in with 2 items and 2 sub items from which one would like to receive the data structure below back during the final fan-in.
[["item_0_subitem_0", "item_0_subitem_1"], ["item_1_subitem_0", "item_1_subitem_1"]]
In practise one receives the two item list below, where each item is a stringified version of the nested list.
["["\"item_0_subitem_0\"","\"item_0_subitem_1\""]","["\"item_1_subitem_0\"","\"item_1_subitem_1\""]"] The defect is that the content has been serialised twice, but only deserialised once. To fix this one needs a way to deserialise, then iterate over all the items in the data structure and deserialise those.
Error log if applicable:
There is no immediate error apart from the inconvenience of having to handle a partially deserialised JSON object.
To Reproduce Full Hera code to reproduce the bug:
from hera.workflows import Parameter, Steps, Workflow, script
from hera.workflows.models import ValueFrom
PROCESS_INNER = "process-inner"
@script(outputs=Parameter(name="items", value_from=ValueFrom(path="/tmp/items.json")))
def generate_outer_items():
import json
json.dump([f"item_{i}" for i in range(2)],open(f"/tmp/items.json", "w"))
@script(outputs=Parameter(name="processed_item", value_from=ValueFrom(path="/tmp/processed_item.json")))
def generate_inner_items(item: str):
import json
json.dump([(f"{item}_subitem_{i}", "outer") for i in range(2)], open(f"/tmp/processed_item.json", "w"))
@script(name=PROCESS_INNER,outputs=Parameter(name="processed_subitem", value_from=ValueFrom(path="/tmp/processed_subitem.json")))
def inner(subitem: str):
import json
json.dump((subitem, "test"), open(f"/tmp/processed_subitem.json", "w"))
@script()
def fan_in(all_subitem_list):
import json
# When passing parameters via Argo, these have to be JSON serialised. Hera manages deserialisation automatically,
# unfortunately, in this example and other cases where multiple fan-out layers collapse on a single fan-in, the JSON
# deserialisation is not aware of the workflow's topology and can result in partially deserialised objects.
# The decode function is used to handle these partially deserialised objects.
def decode(jo):
def decode_inner(o):
try:
d = json.loads(o)
except (json.JSONDecodeError, TypeError):
return o
if isinstance(d, str):
return decode_inner(d)
elif isinstance(d, list):
return [decode_inner(i) for i in d]
elif isinstance(d, dict):
return {k: decode_inner(v) for k, v in d.items()}
else:
return d
return decode_inner(json.dumps(jo))
print("raw:")
print(all_subitem_list)
print("decoded:")
print(decode(all_subitem_list))
with Workflow(
generate_name="fan-out-fan-in-nested-",
entrypoint="main",
) as wf:
with Steps(name="outer",
inputs=[Parameter(name="input_item"), Parameter(name="same_value")],
outputs=[Parameter(name="processed_subitems", value_from=
ValueFrom(parameter="{{steps.%s.outputs.parameters.processed_subitem}}" % PROCESS_INNER
))],
) as outer:
inner_items = generate_inner_items(
arguments={"item": outer.get_parameter("input_item"), "same_value": "same_for_all"},
)
processed_inner_item = inner(
arguments={"subitem": "{{item}}"},
with_param=inner_items.get_parameter("processed_item")
)
with Steps(name="main"):
outer_items = generate_outer_items()
processed_outer_items = outer(
arguments={"input_item": "{{item}}", "same_value": "same_for_all"},
with_param=outer_items.get_parameter("items")
)
# Note: This is a single fan-in for both fan-out levels (outer, inner), see the `decode` function for how to handle
# the JSON deserialisation
fan_in(
arguments={"all_subitem_list": processed_outer_items.get_parameter("processed_subitems")}
)
Expected behavior A clear and concise description of what you expected to happen:
The input object to fan_in
should be a valid python object without any remanent JSON strings within.
Environment
- Hera Version: 5.X.X
- Python Version: 3.11.X
- Argo Version: 3.X.X
Additional context
Please notice the decode(jo)
which could be used alternatively to the original json.loads(r'''{{inputs.parameters.in_param}}'''
to handle this issue.