aws-step-functions-data-science-sdk-python
aws-step-functions-data-science-sdk-python copied to clipboard
Execution inputs as container arguments for processing jobs
I'm trying to use execution inputs as container arguements for my processing job:
execution_input = ExecutionInput(
schema={
"IngestaJobName": str,
"PreprocessingJobName": str,
"InferenceJobName": str,
"Fecha": str,
}
)
#Call step
ingesta_step = ProcessingStep(
inference_config["ingesta_step_name"],
processor=ingesta_processor,
job_name=execution_input['IngestaJobName'],
inputs=inputs_ingesta,
outputs=outputs_ingesta,
container_arguments=["--fecha", "$$.Execution.Input['Fecha']"],
container_entrypoint=["python3", "/opt/ml/processing/input/code/"+inference_config["ingesta_function"]],
)
I've also tried to replace container_arguments for ["--fecha", execution_input["Fecha"]]
But in both cases it doesn't work.
Use Case
When I lunch a new execution of my state machine, it would be useful to get some execution inputs as a container argument in order to define some parameters of intereset that will be define the behaviour of the step directly by the execution input without updating the state machine definition
This is a :rocket: Feature Request
I've also tried to replace container_arguments for ["--fecha", execution_input["Fecha"]] But in both cases it doesn't work.
Can you elaborate on the behaviour you are seeing?
With ["--fecha", execution_input["Fecha"]]
I get this error TypeError: Object of type ExecutionInput is not JSON serializable
When I execute
branching_workflow = Workflow(
name=pipeline_name_step,
definition=workflow_graph,
role=role,
execution_input=execution_input
).create()
This is the complete error:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Input In [282], in <cell line: 2>()
1 # Create or update your StateMachine Workflow
----> 2 branch_workflow = branching_workflow.create()
File /usr/local/lib/python3.8/site-packages/stepfunctions/workflow/stepfunctions.py:205, in Workflow.create(self)
202 return self.state_machine_arn
204 try:
--> 205 self.state_machine_arn = self._create()
206 except self.client.exceptions.StateMachineAlreadyExists as e:
207 self.state_machine_arn = self._extract_state_machine_arn(e)
File /usr/local/lib/python3.8/site-packages/stepfunctions/workflow/stepfunctions.py:215, in Workflow._create(self)
212 def _create(self):
213 response = self.client.create_state_machine(
214 name=self.name,
--> 215 definition=self.definition.to_json(pretty=self.format_json),
216 roleArn=self.role,
217 tags=self.tags
218 )
219 logger.info("Workflow created successfully on AWS Step Functions.")
220 return response['stateMachineArn']
File /usr/local/lib/python3.8/site-packages/stepfunctions/steps/states.py:91, in Block.to_json(self, pretty)
82 """Serialize to a JSON formatted string.
83
84 Args:
(...)
88 str: JSON formatted string representation of the block.
89 """
90 if pretty:
---> 91 return json.dumps(self.to_dict(), indent=4)
93 return json.dumps(self.to_dict())
File /usr/local/lib/python3.8/json/__init__.py:234, in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
232 if cls is None:
233 cls = JSONEncoder
--> 234 return cls(
235 skipkeys=skipkeys, ensure_ascii=ensure_ascii,
236 check_circular=check_circular, allow_nan=allow_nan, indent=indent,
237 separators=separators, default=default, sort_keys=sort_keys,
238 **kw).encode(obj)
File /usr/local/lib/python3.8/json/encoder.py:201, in JSONEncoder.encode(self, o)
199 chunks = self.iterencode(o, _one_shot=True)
200 if not isinstance(chunks, (list, tuple)):
--> 201 chunks = list(chunks)
202 return ''.join(chunks)
File /usr/local/lib/python3.8/json/encoder.py:431, in _make_iterencode.<locals>._iterencode(o, _current_indent_level)
429 yield from _iterencode_list(o, _current_indent_level)
430 elif isinstance(o, dict):
--> 431 yield from _iterencode_dict(o, _current_indent_level)
432 else:
433 if markers is not None:
File /usr/local/lib/python3.8/json/encoder.py:405, in _make_iterencode.<locals>._iterencode_dict(dct, _current_indent_level)
403 else:
404 chunks = _iterencode(value, _current_indent_level)
--> 405 yield from chunks
406 if newline_indent is not None:
407 _current_indent_level -= 1
File /usr/local/lib/python3.8/json/encoder.py:405, in _make_iterencode.<locals>._iterencode_dict(dct, _current_indent_level)
403 else:
404 chunks = _iterencode(value, _current_indent_level)
--> 405 yield from chunks
406 if newline_indent is not None:
407 _current_indent_level -= 1
[... skipping similar frames: _make_iterencode.<locals>._iterencode_dict at line 405 (2 times)]
File /usr/local/lib/python3.8/json/encoder.py:405, in _make_iterencode.<locals>._iterencode_dict(dct, _current_indent_level)
403 else:
404 chunks = _iterencode(value, _current_indent_level)
--> 405 yield from chunks
406 if newline_indent is not None:
407 _current_indent_level -= 1
File /usr/local/lib/python3.8/json/encoder.py:325, in _make_iterencode.<locals>._iterencode_list(lst, _current_indent_level)
323 else:
324 chunks = _iterencode(value, _current_indent_level)
--> 325 yield from chunks
326 if newline_indent is not None:
327 _current_indent_level -= 1
File /usr/local/lib/python3.8/json/encoder.py:438, in _make_iterencode.<locals>._iterencode(o, _current_indent_level)
436 raise ValueError("Circular reference detected")
437 markers[markerid] = o
--> 438 o = _default(o)
439 yield from _iterencode(o, _current_indent_level)
440 if markers is not None:
File /usr/local/lib/python3.8/json/encoder.py:179, in JSONEncoder.default(self, o)
160 def default(self, o):
161 """Implement this method in a subclass such that it returns
162 a serializable object for ``o``, or calls the base implementation
163 (to raise a ``TypeError``).
(...)
177
178 """
--> 179 raise TypeError(f'Object of type {o.__class__.__name__} '
180 f'is not JSON serializable')
TypeError: Object of type ExecutionInput is not JSON serializable
On the other hand, with ["--fecha", "$$.Execution.Input['Fecha']"]
, I get $$.Execution.Input['Fecha']
as a literal string inside the .py of the processing job, it seems that the placeholder doesn't work.
Thanks for the additional details. The problem is likely that ProcessingStep passes container_arguments
directly to `sagemaker.workflow.airflow#processing_config
https://github.com/aws/aws-step-functions-data-science-sdk-python/blob/02ed72b10d7c0dbfb3e404b1e183c309c040bfaa/src/stepfunctions/steps/sagemaker.py#L558-L561
The work should be similar to https://github.com/aws/aws-step-functions-data-science-sdk-python/pull/155
With some slight differences because difference because container_arguments is an array instead of a object
Hi Wong, thank you for your response.
I don't know how to use parameters inside the processing job. I've tried
But its the same as defining a container argument, so I get the same error