aws-step-functions-data-science-sdk-python
aws-step-functions-data-science-sdk-python copied to clipboard
Simplify parameter handling with map
trafficstars
Right now managing the parameters of the iterator to a map is challenging, because there's no way to use the input schema to ensure you're passing things to the right places.
It'd be great if the API provided some facility for doing this to reduce the difficulty.
Here's an API I imagine would work well:
exec_input = Input(schema={
"key": [{"k1": str, "k2": str}],
})
state = Map(
input=exec_input["key"],
iterator=lambda input: Task(parameters={"SomeField": input["k1"], "OtherField": input["k2"]})
)
This would be able to validate at every level that what's being read matches the input schema.
I landed on some utility functions to address this use case:
def map_step(state_id, items, max_concurrency, iterator):
if type(items.type) is not list:
raise TypeError("Unexpected type for items: {}".format(items))
return stepfunctions.steps.Chain([
stepfunctions.steps.Pass(
state_id="{}-pass".format(state_id),
parameters={
"items": items,
},
),
stepfunctions.steps.Map(
state_id=state_id,
max_concurrency=max_concurrency,
input_path="$.items",
iterator=iterator(stepfunctions.inputs.StepInput(schema=items.type[0])),
)
])
Used like:
exec_input = Input(schema={
"key": [{"k1": str, "k2": str}],
})
state = map_step(
state_id="my-state",
max_concurrency=3,
items=exec_input["key"],
iterator=lambda input: Task(parameters={"SomeField": input["k1"], "OtherField": input["k2"]})
)