hera icon indicating copy to clipboard operation
hera copied to clipboard

JSON parsing for inline scripts needs custom processing in certain situations (fan-out/in)

Open ljyanesm opened this issue 6 months ago • 1 comments

Pre-bug-report checklist

1. This bug can be reproduced using pure Argo YAML

If yes, it is more likely to be an Argo bug unrelated to Hera. Please double check before submitting an issue to Hera.

2. This bug occurs in Hera when...

  • [ ] exporting to YAML
  • [ ] submitting to Argo
  • [ ] running on Argo with the Hera runner
  • [x] other:

Handling JSON serialised objects generated by Argo Worfklows using the json.loads method included at the beginning of all Scripts

Bug report

Describe the bug A clear and concise description of what the bug is:

When using nested fan-in patterns on Script tasks the json parsing code introduced to handle Argo parameters can fail, see:

try: message = json.loads(r'''{{inputs.parameters.in_param}}''')
except: message = r'''{{inputs.parameters.in_param}}'''

Unfortunately this results in a broken behaviour if one uses a nested fan-out and fan-in pattern. In these cases the Argo YAML generated isn’t aware of the multiple levels of serialisation, so one ends up with a top level JSON object that contains JSON that has been stringified.

For example consider a nested fan-out fan-in with 2 items and 2 sub items from which one would like to receive the data structure below back during the final fan-in.

[["item_0_subitem_0", "item_0_subitem_1"], ["item_1_subitem_0", "item_1_subitem_1"]]

In practise one receives the two item list below, where each item is a stringified version of the nested list.

["["\"item_0_subitem_0\"","\"item_0_subitem_1\""]","["\"item_1_subitem_0\"","\"item_1_subitem_1\""]"] The defect is that the content has been serialised twice, but only deserialised once. To fix this one needs a way to deserialise, then iterate over all the items in the data structure and deserialise those.

Error log if applicable:

There is no immediate error apart from the inconvenience of having to handle a partially deserialised JSON object.

To Reproduce Full Hera code to reproduce the bug:

from hera.workflows import Parameter, Steps, Workflow, script
from hera.workflows.models import ValueFrom

PROCESS_INNER = "process-inner"
@script(outputs=Parameter(name="items", value_from=ValueFrom(path="/tmp/items.json")))
def generate_outer_items():
    import json
    json.dump([f"item_{i}" for i in range(2)],open(f"/tmp/items.json", "w"))


@script(outputs=Parameter(name="processed_item", value_from=ValueFrom(path="/tmp/processed_item.json")))
def generate_inner_items(item: str):
    import json
    json.dump([(f"{item}_subitem_{i}", "outer") for i in range(2)], open(f"/tmp/processed_item.json", "w"))


@script(name=PROCESS_INNER,outputs=Parameter(name="processed_subitem", value_from=ValueFrom(path="/tmp/processed_subitem.json")))
def inner(subitem: str):
    import json
    json.dump((subitem, "test"), open(f"/tmp/processed_subitem.json", "w"))


@script()
def fan_in(all_subitem_list):
    import json
    # When passing parameters via Argo, these have to be JSON serialised. Hera manages deserialisation automatically,
    # unfortunately, in this example and other cases where multiple fan-out layers collapse on a single fan-in, the JSON
    # deserialisation is not aware of the workflow's topology and can result in partially deserialised objects.
    # The decode function is used to handle these partially deserialised objects.
    def decode(jo):
        def decode_inner(o):
            try:
                d = json.loads(o)
            except (json.JSONDecodeError, TypeError):
                return o
            if isinstance(d, str):
                return decode_inner(d)
            elif isinstance(d, list):
                return [decode_inner(i) for i in d]
            elif isinstance(d, dict):
                return {k: decode_inner(v) for k, v in d.items()}
            else:
                return d
        return decode_inner(json.dumps(jo))
    print("raw:")
    print(all_subitem_list)
    print("decoded:")
    print(decode(all_subitem_list))


with Workflow(
    generate_name="fan-out-fan-in-nested-",
    entrypoint="main",
) as wf:
    with Steps(name="outer",
               inputs=[Parameter(name="input_item"), Parameter(name="same_value")],
               outputs=[Parameter(name="processed_subitems", value_from=
               ValueFrom(parameter="{{steps.%s.outputs.parameters.processed_subitem}}" % PROCESS_INNER
               ))],
               ) as outer:
        inner_items = generate_inner_items(
            arguments={"item": outer.get_parameter("input_item"), "same_value": "same_for_all"},
        )
        processed_inner_item = inner(
            arguments={"subitem": "{{item}}"},
            with_param=inner_items.get_parameter("processed_item")
        )
    with Steps(name="main"):
        outer_items = generate_outer_items()
        processed_outer_items = outer(
            arguments={"input_item": "{{item}}", "same_value": "same_for_all"},
            with_param=outer_items.get_parameter("items")
        )
        # Note: This is a single fan-in for both fan-out levels (outer, inner), see the `decode` function for how to handle
        # the JSON deserialisation
        fan_in(
            arguments={"all_subitem_list": processed_outer_items.get_parameter("processed_subitems")}
        )

Expected behavior A clear and concise description of what you expected to happen:

The input object to fan_in should be a valid python object without any remanent JSON strings within.

Environment

  • Hera Version: 5.X.X
  • Python Version: 3.11.X
  • Argo Version: 3.X.X

Additional context Please notice the decode(jo) which could be used alternatively to the original json.loads(r'''{{inputs.parameters.in_param}}''' to handle this issue.

ljyanesm avatar Aug 05 '24 15:08 ljyanesm