sagemaker-python-sdk
sagemaker-python-sdk copied to clipboard
sagemaker.workflow.pipeline.PipelineGraph throws exeption when a SageMaker Step's input is a function
Describe the bug
I am trying to use PipelineGraph
to sort out the dependencies of my steps. The issue occurs because we have a custom Pipeline class that schedules our steps one after the other and some of them have the inputs
as a method that gets evaluated when the step is ready to be executed.
Such as:
step.inputs = (
step.inputs if not inspect.ismethod(step.inputs) else step.inputs()
)
The error is:
TypeError: unsupported operand type(s) for +: 'method' and 'list'
This is due to the method:
sagemaker.workflow.steps.Step._find_dependencies_in_step_arguments
as it cannot reconcile the inputs being a method to be executed later.
To reproduce
from sagemaker.workflow.pipeline import PipelineGraph
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.processing import ScriptProcessor
def my_function():
pass
Create any step and assign its inputs to be a function.
step_calls = ProcessingStep(
name='calls',
description='Processing missing calldata.',
processor=script_processor,
inputs=my_function
)
PipelineGraph(step_call)
Expected behavior
I expect that if the inputs are a method then they should be ignored. Maybe the PipelineGraph
can have a boolean
value to toggle this behaviour with default being to check the inputs for dependencies.
Screenshots or logs
File "../console.py", line 21, in run_training_pipeline
training_pipeline.run(run_date, local)
File "../src/pipelines/training_pipeline.py", line 42, in run
PipelineGraph(
File "/home/noname/.venv/main/lib/python3.10/site-packages/sagemaker/workflow/pipeline.py", line 568, in __init__
self.adjacency_list = self._initialize_adjacency_list()
File "/home/noname/.venv/main/lib/python3.10/site-packages/sagemaker/workflow/pipeline.py", line 596, in _initialize_adjacency_list
step._find_step_dependencies(self.step_map)
File "/home/noname/.venv/main/lib/python3.10/site-packages/sagemaker/workflow/steps.py", line 183, in _find_step_dependencies
self.step_only_arguments, step_map
File "/home/noname/.venv/main/lib/python3.10/site-packages/sagemaker/workflow/steps.py", line 115, in step_only_arguments
return self.arguments
File "/home/noname/.venv/main/lib/python3.10/site-packages/sagemaker/workflow/steps.py", line 853, in arguments
normalized_inputs, normalized_outputs = self.processor._normalize_args(
File "/home/noname/.venv/main/lib/python3.10/site-packages/sagemaker/processing.py", line 242, in _normalize_args
inputs_with_code = self._include_code_in_inputs(inputs, code, kms_key)
File "/home/noname/.venv/main/lib/python3.10/site-packages/sagemaker/processing.py", line 583, in _include_code_in_inputs
inputs_with_code = self._convert_code_and_add_to_inputs(inputs, user_code_s3_uri)
File "/home/noname/.venv/main/lib/python3.10/site-packages/sagemaker/processing.py", line 697, in _convert_code_and_add_to_inputs
return (inputs or []) + [code_file_input]
TypeError: unsupported operand type(s) for +: 'method' and 'list'
System information A description of your system. Please provide:
- SageMaker Python SDK version: na
- Framework name (eg. PyTorch) or algorithm (eg. KMeans): na
- Framework version: na
- Python version:
- CPU or GPU: na
- Custom Docker image (Y/N): na
Additional context
I understand that this might not be an intended use case for the class PipelineGraph
but it is a scenario that occurs. It would be nice if a fix is applied.
some of them have the inputs as a method that gets evaluated when the step is ready to be executed.
Can you give me an example of when the step inputs are evaluated?
Can you not pass in the output of the method call?
from sagemaker.workflow.pipeline import PipelineGraph
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.processing import ScriptProcessor
def my_function():
pass
Create any step and assign its inputs to be a function.
step_calls = ProcessingStep(
name='calls',
description='Processing missing calldata.',
processor=script_processor,
inputs=my_function() # used to be my_function without the parenthesis ()
)
PipelineGraph(step_call)
Hi. Here is an example.
We have our steps in a list (Just like the Pipeline class from SageMaker). Our class is called TestPipeline
TestPipeline(
name="Training-Pipeline",
parameters=[
training_steps.baseline_threshold,
training_steps.model_approval_status,
],
steps=[
preprocessing_steps.step_1,
preprocessing_steps.step_2,
preprocessing_steps.step_3,
preprocessing_steps.step_feature_engineering,
training_steps.step_split,
training_steps.step_tuning,
training_steps.step_evaluation,
],
)
Inside it, I pass the list of steps to PipelineGraph from sagemaker and run each step one after the other.
self.steps = PipelineGraph(steps)
while step in self.steps:
step.inputs = step.inputs if not inspect.ismethod(step.inputs) else step.inputs() #here is where we invoke the inputs() as a function if needed.
step.run(...) #or whatever its run function is, it depends on step_type
It is all fine unless we have
def my_function():
pass
step_calls = ProcessingStep(
name='calls',
description='Processing missing calldata.',
processor=script_processor,
inputs=my_function # No parenthesis, evaluate at run time
)
This is okay:
def my_function():
pass
step_calls = ProcessingStep(
name='calls',
description='Processing missing calldata.',
processor=script_processor,
inputs=my_function() # evaluate at creation time
)
@nmadan I think for our use-case it is perfectly fine to explicitly define the dependencies in the depends_on field. Currently, the sagemaker PipelineGraph is not working at all so the small tradeoff of having to manually specify step dependencies is okay.
Would it make sense to place a boolean switch as a parameter to PipelineGraph that would by default let the class search the inputs but if it is explicitly turned off it will skip the inputs for dependencies?
Thanks for that info. I have a few more questions to understand your usecase so that I can provide an alternative if possible. I am not very inclined to add a flag to support this very specific usecase so I will find my best to provide some alternative.
- What is the purpose of the
TestPipeline
class? Does it run the steps locally? - Why do the inputs need to be evaluated at runtime. Can you not define the step like this:
my_step = ProcessingStep(
name='calls',
description='Processing missing calldata.',
processor=script_processor,
inputs=my_function()
)
some_other_step = ProcessingStep(
name='calls',
description='Processing missing calldata.',
processor=script_processor,
inputs=[ProcessingInput(...)]
)
- Why do you want to use PipelineGraph to execute the pipeline? You should ideally be creating then starting the pipeline. As long as the pipeline step dependencies are defined correctly, our pipeline executor will execute them sequentially.
pipeline.create(..)
pipeline.start(..)
Hi @nmadan
Some pipelines run purely on the cloud. Then there is no issue. Some pipelines are hybrid and a few of the first steps run locally and the rest on the cloud (for example XGBoost is on the cloud). In the case of the hybrid pipelines some parameters cannot be evaluated before a step has finished thus when I give the steps to PipelineGraph, it tries to evaluate the step's inputs, it finds fields that are not yet there thus the program stops.
PipelineGraph is there to sort the step execution order in the case of the hybrid pipelines and to help print the pipeline as a graph.
The reason I am proposing a flag is because it will not break legacy code. Also, I do understand that if you decide to not evaluate inputs when they are a function, it can cause an issue for people who forget to do inputs = my_function()
and unintentionally did inputs = my_function #no ()
Thanks for the help. I hope you can find a suitable solution.
Hi @DRKolev-code, thanks again for those clarifications. Since you have a custom pipeline class that runs in cloud-local hybrid mode and this is not a feature supported by our team, I would recommend you create a subclass of PipelineGraph and add a flag to ignore step inputs in there. Thanks again, hope this helps.