aws-step-functions-data-science-sdk-python icon indicating copy to clipboard operation
aws-step-functions-data-science-sdk-python copied to clipboard

Simplify parameter handling with map

Open alex opened this issue 5 years ago • 2 comments
trafficstars

Right now managing the parameters of the iterator to a map is challenging, because there's no way to use the input schema to ensure you're passing things to the right places.

It'd be great if the API provided some facility for doing this to reduce the difficulty.

alex avatar Jun 27 '20 15:06 alex

Here's an API I imagine would work well:

exec_input = Input(schema={
  "key": [{"k1": str, "k2": str}],
})

state = Map(
   input=exec_input["key"],
   iterator=lambda input: Task(parameters={"SomeField": input["k1"], "OtherField": input["k2"]})
)

This would be able to validate at every level that what's being read matches the input schema.

alex avatar Jun 27 '20 19:06 alex

I landed on some utility functions to address this use case:

def map_step(state_id, items, max_concurrency, iterator):
    if type(items.type) is not list:
        raise TypeError("Unexpected type for items: {}".format(items))
    return stepfunctions.steps.Chain([
        stepfunctions.steps.Pass(
            state_id="{}-pass".format(state_id),
            parameters={
                "items": items,
            },
        ),
        stepfunctions.steps.Map(
            state_id=state_id,
            max_concurrency=max_concurrency,
            input_path="$.items",
            iterator=iterator(stepfunctions.inputs.StepInput(schema=items.type[0])),
        )
    ])

Used like:

exec_input = Input(schema={
  "key": [{"k1": str, "k2": str}],
})

state = map_step(
    state_id="my-state",
    max_concurrency=3,
    items=exec_input["key"],
    iterator=lambda input: Task(parameters={"SomeField": input["k1"], "OtherField": input["k2"]})
)

alex avatar Jun 28 '20 15:06 alex