zenml
zenml copied to clipboard
[FEATURE]: Nested `BaseStepConfig`
Contact Details [Optional]
No response
Describe the feature you'd like
I would like to be able to use a nested structure for the BaseStep Config. This would allow the definition of a very generic and configurable pipeline step.
I am trying to build a generic step config that contains a list of processing step configs that can be arbitrary complex. Here a basic example of the workflow:
from pydantic import BaseModel
from zenml.steps import step, BaseStepConfig, Output
from typing import List
from zenml.pipelines import pipeline
class BaseProcessorConfig(BaseModel):
"""Some (feature) processor config"""
processor_type: str
class MyProcessorConfig(BaseProcessorConfig):
"""The configuration of a specific feature processor with special parameters"""
processor_type = "my_processor"
param = 1
class MyOtherProcessorConfig(BaseProcessorConfig):
"""The configuration of another specific feature processor with special parameters"""
processor_type = "my_other_processor"
multiplier = 2
class ProcessingStepConfig(BaseStepConfig):
"""
The (zenml) step configuration that can have an arbitrary list of processing steps
that should be called in a sequence
"""
processing_steps: List[BaseProcessorConfig]
@step
def process_data_step(config: ProcessingStepConfig) -> Output(success=int):
"""A step that applies the individual processing steps to the data"""
for processor_config in config.processing_steps:
print(processor_config.dict())
# The processor_config should have the parameter 'param' or 'multiplier'
# dependening on the type of the processor. But the processor_config is only
# an object of type BaseProcessorConfig
# Run the processor of type processor_config["processor_type"]
# with its specific parameters
return 1
@pipeline
def some_pipeline(process_data):
process_data()
pipeline = some_pipeline(
process_data=process_data_step(
config=ProcessingStepConfig(
processing_steps = [MyProcessorConfig(), MyOtherProcessorConfig()]
)
)
)
pipeline.run()
Is your feature request related to a problem?
Inside the step I cannot access the parameters of the Subclassed Processing configs.
How do you solve your current problem with the current status-quo of ZenML?
I am currently solving this for my use-case with a modified BaseStepConfig.init . But I think this is rather generic and could be handled inside of zenml.
Here my fix:
from pydantic import BaseModel
from zenml.steps import step, BaseStepConfig, Output
from typing import List
from zenml.pipelines import pipeline
class BaseProcessorConfig(BaseModel):
"""Some (feature) processor config"""
processor_type: str
class MyProcessorConfig(BaseProcessorConfig):
"""The configuration of a specific feature processor with special parameters"""
processor_type = "my_processor"
param = 1
class MyOtherProcessorConfig(BaseProcessorConfig):
"""The configuration of another specific feature processor with special parameters"""
processor_type = "my_other_processor"
multiplier = 2
config_map = {
"my_processor": MyProcessorConfig,
"my_other_processor": MyOtherProcessorConfig
}
class ProcessingStepConfig(BaseStepConfig):
"""
The (zenml) step configuration that can have an arbitrary list of processing steps
that should be called in a sequence
"""
processing_steps: List[BaseProcessorConfig]
def __init__(self, **kwargs):
processing_steps = []
for processor_config in kwargs["processing_steps"]:
if isinstance(processor_config, BaseProcessorConfig):
processing_steps.append(processor_config)
else:
config_class = config_map[processor_config["processor_type"]]
processing_steps.append(
config_class(**processor_config)
)
kwargs["processing_steps"] = processing_steps
super().__init__(**kwargs)
@step
def process_data_step(config: ProcessingStepConfig) -> Output(success=int):
"""A step that applies the individual processing steps to the data"""
for processor_config in config.processing_steps:
# Now the processor config does have the correct parameters
# Run the processor of type processor_config["processor_type"]
# with its specific parameters
print(processor_config.dict())
return 1
@pipeline
def some_pipeline(process_data):
process_data()
pipeline = some_pipeline(
process_data=process_data_step(
config=ProcessingStepConfig(
processing_steps = [MyProcessorConfig(), MyOtherProcessorConfig()]
)
)
)
pipeline.run()
Any other comments?
No response
Hi @felixthebeard! Thanks for posting this issue. I don't have an immediate answer for you, but I'll look into it and get back to you here when I've had a chance to figure out what's going on.
Hi @felixthebeard. Sorry for the delay in getting back to you. We had a bit of internal discussion about this and this might be something we address in the future, but more likely will be enabled for you in a different way from some other changes we'll be making. In any case, we'll leave this issue open and report back when we touch on this use case again.
Closing this because we removed these config classes from ZenML