sagemaker-python-sdk icon indicating copy to clipboard operation
sagemaker-python-sdk copied to clipboard

sagemaker.workflow.pipeline.PipelineGraph throws exeption when a SageMaker Step's input is a function

Open DRKolev-code opened this issue 2 years ago • 6 comments

Describe the bug I am trying to use PipelineGraph to sort out the dependencies of my steps. The issue occurs because we have a custom Pipeline class that schedules our steps one after the other and some of them have the inputs as a method that gets evaluated when the step is ready to be executed.

Such as:

step.inputs = (
                step.inputs if not inspect.ismethod(step.inputs) else step.inputs()
            )

The error is: TypeError: unsupported operand type(s) for +: 'method' and 'list'

This is due to the method:

sagemaker.workflow.steps.Step._find_dependencies_in_step_arguments

as it cannot reconcile the inputs being a method to be executed later.

To reproduce

from sagemaker.workflow.pipeline import PipelineGraph
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.processing import ScriptProcessor

def my_function():
    pass

Create any step and assign its inputs to be a function.

step_calls = ProcessingStep(
            name='calls',
            description='Processing missing calldata.',
            processor=script_processor,
            inputs=my_function
)

PipelineGraph(step_call)

Expected behavior I expect that if the inputs are a method then they should be ignored. Maybe the PipelineGraph can have a boolean value to toggle this behaviour with default being to check the inputs for dependencies.

Screenshots or logs

File "../console.py", line 21, in run_training_pipeline
    training_pipeline.run(run_date, local)

  File "../src/pipelines/training_pipeline.py", line 42, in run
    PipelineGraph(

  File "/home/noname/.venv/main/lib/python3.10/site-packages/sagemaker/workflow/pipeline.py", line 568, in __init__
    self.adjacency_list = self._initialize_adjacency_list()

  File "/home/noname/.venv/main/lib/python3.10/site-packages/sagemaker/workflow/pipeline.py", line 596, in _initialize_adjacency_list
    step._find_step_dependencies(self.step_map)

  File "/home/noname/.venv/main/lib/python3.10/site-packages/sagemaker/workflow/steps.py", line 183, in _find_step_dependencies
    self.step_only_arguments, step_map

  File "/home/noname/.venv/main/lib/python3.10/site-packages/sagemaker/workflow/steps.py", line 115, in step_only_arguments
    return self.arguments

  File "/home/noname/.venv/main/lib/python3.10/site-packages/sagemaker/workflow/steps.py", line 853, in arguments
    normalized_inputs, normalized_outputs = self.processor._normalize_args(

  File "/home/noname/.venv/main/lib/python3.10/site-packages/sagemaker/processing.py", line 242, in _normalize_args
    inputs_with_code = self._include_code_in_inputs(inputs, code, kms_key)

  File "/home/noname/.venv/main/lib/python3.10/site-packages/sagemaker/processing.py", line 583, in _include_code_in_inputs
    inputs_with_code = self._convert_code_and_add_to_inputs(inputs, user_code_s3_uri)

  File "/home/noname/.venv/main/lib/python3.10/site-packages/sagemaker/processing.py", line 697, in _convert_code_and_add_to_inputs
    return (inputs or []) + [code_file_input]

TypeError: unsupported operand type(s) for +: 'method' and 'list'

System information A description of your system. Please provide:

  • SageMaker Python SDK version: na
  • Framework name (eg. PyTorch) or algorithm (eg. KMeans): na
  • Framework version: na
  • Python version:
  • CPU or GPU: na
  • Custom Docker image (Y/N): na

Additional context I understand that this might not be an intended use case for the class PipelineGraph but it is a scenario that occurs. It would be nice if a fix is applied.

DRKolev-code avatar Aug 02 '22 11:08 DRKolev-code

some of them have the inputs as a method that gets evaluated when the step is ready to be executed.

Can you give me an example of when the step inputs are evaluated?

Can you not pass in the output of the method call?

from sagemaker.workflow.pipeline import PipelineGraph
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.processing import ScriptProcessor

def my_function():
    pass

Create any step and assign its inputs to be a function.

step_calls = ProcessingStep(
            name='calls',
            description='Processing missing calldata.',
            processor=script_processor,
            inputs=my_function() # used to be my_function without the parenthesis ()
)

PipelineGraph(step_call)

nmadan avatar Aug 03 '22 19:08 nmadan

Hi. Here is an example.

We have our steps in a list (Just like the Pipeline class from SageMaker). Our class is called TestPipeline

TestPipeline(
            name="Training-Pipeline",
            parameters=[
                training_steps.baseline_threshold,
                training_steps.model_approval_status,
            ],
            steps=[
                preprocessing_steps.step_1,
                preprocessing_steps.step_2,
                preprocessing_steps.step_3,
                preprocessing_steps.step_feature_engineering,
                training_steps.step_split,
                training_steps.step_tuning,
                training_steps.step_evaluation,
            ],
)

Inside it, I pass the list of steps to PipelineGraph from sagemaker and run each step one after the other.

self.steps = PipelineGraph(steps)

while step in self.steps:
    step.inputs = step.inputs if not inspect.ismethod(step.inputs) else step.inputs() #here is where we invoke the inputs() as a function if needed.
    step.run(...) #or whatever its run function is, it depends on step_type

It is all fine unless we have

def my_function():
    pass

step_calls = ProcessingStep(
            name='calls',
            description='Processing missing calldata.',
            processor=script_processor,
            inputs=my_function # No parenthesis, evaluate at run time
)

This is okay:

def my_function():
    pass
step_calls = ProcessingStep(
            name='calls',
            description='Processing missing calldata.',
            processor=script_processor,
            inputs=my_function() # evaluate at creation time
)

DRKolev-code avatar Aug 04 '22 13:08 DRKolev-code

@nmadan I think for our use-case it is perfectly fine to explicitly define the dependencies in the depends_on field. Currently, the sagemaker PipelineGraph is not working at all so the small tradeoff of having to manually specify step dependencies is okay.

Would it make sense to place a boolean switch as a parameter to PipelineGraph that would by default let the class search the inputs but if it is explicitly turned off it will skip the inputs for dependencies?

DRKolev-code avatar Aug 05 '22 08:08 DRKolev-code

Thanks for that info. I have a few more questions to understand your usecase so that I can provide an alternative if possible. I am not very inclined to add a flag to support this very specific usecase so I will find my best to provide some alternative.

  1. What is the purpose of the TestPipeline class? Does it run the steps locally?
  2. Why do the inputs need to be evaluated at runtime. Can you not define the step like this:
my_step = ProcessingStep(
            name='calls',
            description='Processing missing calldata.',
            processor=script_processor,
            inputs=my_function() 
)

some_other_step = ProcessingStep(
            name='calls',
            description='Processing missing calldata.',
            processor=script_processor,
            inputs=[ProcessingInput(...)]
)
  1. Why do you want to use PipelineGraph to execute the pipeline? You should ideally be creating then starting the pipeline. As long as the pipeline step dependencies are defined correctly, our pipeline executor will execute them sequentially.
pipeline.create(..)
pipeline.start(..)

nmadan avatar Aug 05 '22 19:08 nmadan

Hi @nmadan

Some pipelines run purely on the cloud. Then there is no issue. Some pipelines are hybrid and a few of the first steps run locally and the rest on the cloud (for example XGBoost is on the cloud). In the case of the hybrid pipelines some parameters cannot be evaluated before a step has finished thus when I give the steps to PipelineGraph, it tries to evaluate the step's inputs, it finds fields that are not yet there thus the program stops.

PipelineGraph is there to sort the step execution order in the case of the hybrid pipelines and to help print the pipeline as a graph.

The reason I am proposing a flag is because it will not break legacy code. Also, I do understand that if you decide to not evaluate inputs when they are a function, it can cause an issue for people who forget to do inputs = my_function() and unintentionally did inputs = my_function #no ()

Thanks for the help. I hope you can find a suitable solution.

DRKolev-code avatar Aug 05 '22 22:08 DRKolev-code

Hi @DRKolev-code, thanks again for those clarifications. Since you have a custom pipeline class that runs in cloud-local hybrid mode and this is not a feature supported by our team, I would recommend you create a subclass of PipelineGraph and add a flag to ignore step inputs in there. Thanks again, hope this helps.

nmadan avatar Aug 08 '22 18:08 nmadan