zenml
zenml copied to clipboard
[BUG]: StepInterfaceError: Unable to find materializer for output 'output' of type `<class 'keras.engine.training.Model'>` in step 'trainer'.
Contact Details [Optional]
System Information
ZenML version: 0.13.1
Install path: /home/
What happened?
When I try to implement this code "https://github.com/zenml-io/zenml/tree/main/examples/kubeflow_pipelines_orchestration"; at the step "Run pipeline": _
Initialize the pipeline
first_pipeline = mnist_pipeline( importer=importer(), normalizer=normalizer(), trainer=trainer(), evaluator=evaluator(), )
first_pipeline.run()
_
When I try to run the above snippet, I face StepInterfaceError
StepInterfaceError: Unable to find materializer for output 'output' of type <class 'keras.engine.training.Model'>
in step 'trainer'. Please make sure to either explicitly set a materializer for step outputs using
step.with_return_materializers(...) or registering a default materializer for specific types by subclassing
BaseMaterializer and setting its ASSOCIATED_TYPES class variable. For more information, visit
https://docs.zenml.io/developer-guide/advanced-usage/materializer.
Reproduction steps
You can get the entire source code from here: https://github.com/zenml-io/zenml/tree/main/examples/kubeflow_pipelines_orchestration
Initialize the pipeline
first_pipeline = mnist_pipeline( importer=importer(), normalizer=normalizer(), trainer=trainer(), evaluator=evaluator(), )
first_pipeline.run()
Relevant log output
Creating run for pipeline: mnist_pipeline
Cache enabled for pipeline mnist_pipeline
Using stack default to run pipeline mnist_pipeline...
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ <ipython-input-19-3e8dc075b972>:9 in <module> │
│ │
│ /usr/local/lib/python3.7/dist-packages/zenml/pipelines/base_pipeline.py:449 in run │
│ │
│ 446 │ │ constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True │
│ 447 │ │ try: │
│ 448 │ │ │ return_value = stack.deploy_pipeline( │
│ ❱ 449 │ │ │ │ self, runtime_configuration=runtime_configuration │
│ 450 │ │ │ ) │
│ 451 │ │ finally: │
│ 452 │ │ │ constants.SHOULD_PREVENT_PIPELINE_EXECUTION = False │
│ │
│ /usr/local/lib/python3.7/dist-packages/zenml/stack/stack.py:832 in deploy_pipeline │
│ │
│ 829 │ │ ) │
│ 830 │ │ │
│ 831 │ │ return_value = self.orchestrator.run( │
│ ❱ 832 │ │ │ pipeline, stack=self, runtime_configuration=runtime_configuration │
│ 833 │ │ ) │
│ 834 │ │ │
│ 835 │ │ # Put pipeline level cache policy back to make sure the next runs │
│ │
│ /usr/local/lib/python3.7/dist-packages/zenml/orchestrators/base_orchestrator.py:250 in run │
│ │
│ 247 │ │ # Create the protobuf pipeline which will be needed for various reasons │
│ 248 │ │ # in the following steps │
│ 249 │ │ pb2_pipeline: Pb2Pipeline = Compiler().compile( │
│ ❱ 250 │ │ │ create_tfx_pipeline(pipeline, stack=stack) │
│ 251 │ │ ) │
│ 252 │ │ │
│ 253 │ │ self._configure_node_context( │
│ │
│ /usr/local/lib/python3.7/dist-packages/zenml/orchestrators/utils.py:50 in create_tfx_pipeline │
│ │
│ 47 │ │ │ the pipeline. │
│ 48 │ """ │
│ 49 │ # Connect the inputs/outputs of all steps in the pipeline │
│ ❱ 50 │ zenml_pipeline.connect(**zenml_pipeline.steps) │
│ 51 │ │
│ 52 │ tfx_components = { │
│ 53 │ │ step.name: step.component for step in zenml_pipeline.steps.values() │
│ <ipython-input-18-3f1c2fa0b83f>:11 in mnist_pipeline │
│ │
│ /usr/local/lib/python3.7/dist-packages/zenml/steps/base_step.py:707 in __call__ │
│ │
│ 704 │ │ } │
│ 705 │ │ │
│ 706 │ │ # make sure we have registered materializers for each output │
│ ❱ 707 │ │ materializers = self.get_materializers(ensure_complete=True) │
│ 708 │ │ │
│ 709 │ │ # Prepare the output artifacts and spec │
│ 710 │ │ for key, value in self.OUTPUT_SIGNATURE.items(): │
│ │
│ /usr/local/lib/python3.7/dist-packages/zenml/steps/base_step.py:366 in get_materializers │
│ │
│ 363 │ │ │ │ │ │ f"registering a default materializer for specific " │
│ 364 │ │ │ │ │ │ f"types by subclassing `BaseMaterializer` and setting " │
│ 365 │ │ │ │ │ │ f"its `ASSOCIATED_TYPES` class variable.", │
│ ❱ 366 │ │ │ │ │ │ url="https://docs.zenml.io/developer-guide/advanced-usage/materi │
│ 367 │ │ │ │ │ ) │
│ 368 │ │ │
│ 369 │ │ return materializers │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
StepInterfaceError: Unable to find materializer for output 'output' of type `<class 'keras.engine.training.Model'>`
in step 'trainer'. Please make sure to either explicitly set a materializer for step outputs using
`step.with_return_materializers(...)` or registering a default materializer for specific types by subclassing
`BaseMaterializer` and setting its `ASSOCIATED_TYPES` class variable. For more information, visit
https://docs.zenml.io/developer-guide/advanced-usage/materializer.
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
I am getting the same error for my simple code. Here is my code
from zenml.steps import step, Output
@step
def data_loader() -> Output(x_train=np.ndarray, y_train=np.ndarray, x_test=np.ndarray, y_test=np.ndarray):
"""
return MNIST x_train, y_train, x_test, y_test
"""
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
return x_train, y_train, x_test, y_test
@step
def initialize_model() -> Output(untrained_model=tf.keras.Sequential):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
tf.keras.layers.Flatten(),
tf.keras.layers.Dropout(0.5),
tf.keras.layers.Dense(num_classes, activation="softmax"),
])
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc'])
return model
@step
def preprocess_data(x:np.ndarray, y:np.ndarray) ->Output(x_processed= np.ndarray, y_processed=np.ndarray):
x_processed = x.astype("float32") / 255
x_processed = np.expand_dims(x_processed, -1)
y_processed = keras.utils.to_categorical(y, num_classes)
return x_processed, y_processed
@step
def train_model(x_train:np.ndarray, y_train:np.ndarray, untrained_model:tf.keras.Sequential) -> Output(trained_model=tf.keras.Sequential, history=dict):
history = untrained_model.fit(x_train, y_train, epochs=epochs)
return untrained_model, history
@step
def eval_model(trained_model: tf.keras.Sequential, x_test:np.ndarray, y_test:np.ndarray) -> None:
trainedmodel.evaluate(x_test, y_test)
from zenml.pipelines import pipeline
@pipeline
def runner(data_loader, initialize_model, preprocess_data, train_model):
x_train, y_train, x_test, y_test = data_loader()
x_train_processed, y_train_processed = preprocess_data(x_train, y_train)
model = initialize_model()
model, history = train_model(x_train_processed, y_train_processed, model)
mnist_pipeline = runner(data_loader(), initialize_model(), preprocess_data(), train_model())
mnist_pipeline.run()
The error is
Creating run for pipeline: runner
Cache enabled for pipeline runner
Using stack default to run pipeline runner...
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /tmp/ipykernel_26594/730396930.py:1 in <cell line: 1> │
│ │
│ [Errno 2] No such file or directory: '/tmp/ipykernel_26594/730396930.py' │
│ │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/pipelines/base_pipeline.py:44 │
│ 8 in run │
│ │
│ 445 │ │ # behavior │
│ 446 │ │ constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True │
│ 447 │ │ try: │
│ ❱ 448 │ │ │ return_value = stack.deploy_pipeline( │
│ 449 │ │ │ │ self, runtime_configuration=runtime_configuration │
│ 450 │ │ │ ) │
│ 451 │ │ finally: │
│ │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/stack/stack.py:831 in │
│ deploy_pipeline │
│ │
│ 828 │ │ │ pipeline=pipeline, runtime_configuration=runtime_configuration │
│ 829 │ │ ) │
│ 830 │ │ │
│ ❱ 831 │ │ return_value = self.orchestrator.run( │
│ 832 │ │ │ pipeline, stack=self, runtime_configuration=runtime_configuration │
│ 833 │ │ ) │
│ 834 │
│ │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/orchestrators/base_orchestrat │
│ or.py:250 in run │
│ │
│ 247 │ │ # Create the protobuf pipeline which will be needed for various reasons │
│ 248 │ │ # in the following steps │
│ 249 │ │ pb2_pipeline: Pb2Pipeline = Compiler().compile( │
│ ❱ 250 │ │ │ create_tfx_pipeline(pipeline, stack=stack) │
│ 251 │ │ ) │
│ 252 │ │ │
│ 253 │ │ self._configure_node_context( │
│ │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/orchestrators/utils.py:50 in │
│ create_tfx_pipeline │
│ │
│ 47 │ │ │ the pipeline. │
│ 48 │ """ │
│ 49 │ # Connect the inputs/outputs of all steps in the pipeline │
│ ❱ 50 │ zenml_pipeline.connect(**zenml_pipeline.steps) │
│ 51 │ │
│ 52 │ tfx_components = { │
│ 53 │ │ step.name: step.component for step in zenml_pipeline.steps.values() │
│ │
│ /tmp/ipykernel_26594/100900222.py:7 in runner │
│ │
│ [Errno 2] No such file or directory: '/tmp/ipykernel_26594/100900222.py' │
│ │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/steps/base_step.py:707 in │
│ __call__ │
│ │
│ 704 │ │ } │
│ 705 │ │ │
│ 706 │ │ # make sure we have registered materializers for each output │
│ ❱ 707 │ │ materializers = self.get_materializers(ensure_complete=True) │
│ 708 │ │ │
│ 709 │ │ # Prepare the output artifacts and spec │
│ 710 │ │ for key, value in self.OUTPUT_SIGNATURE.items(): │
│ │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/steps/base_step.py:357 in │
│ get_materializers │
│ │
│ 354 │ │ │ │ materializers[output_name] = materializer │
│ 355 │ │ │ else: │
│ 356 │ │ │ │ if ensure_complete: │
│ ❱ 357 │ │ │ │ │ raise StepInterfaceError( │
│ 358 │ │ │ │ │ │ f"Unable to find materializer for output " │
│ 359 │ │ │ │ │ │ f"'{output_name}' of type `{output_type}` in step " │
│ 360 │ │ │ │ │ │ f"'{self.name}'. Please make sure to either " │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
StepInterfaceError: Unable to find materializer for output 'untrained_model' of type `<class
'keras.engine.sequential.Sequential'>` in step 'initialize_model'. Please make sure to either explicitly set a
materializer for step outputs using `step.with_return_materializers(...)` or registering a default materializer for
specific types by subclassing `BaseMaterializer` and setting its `ASSOCIATED_TYPES` class variable. For more
information, visit https://docs.zenml.io/developer-guide/advanced-usage/materializer.
@ahmadmustafaanis the error you are getting is because there's no materializer written for type <class 'keras.engine.sequential.Sequential'>.
The code should run if you replace tf.keras.Sequential with tf.keras.Model since Sequential inherits from Model class.
@dudeperf3ct I am still getting this error.
StepInterfaceError: Unable to find materializer for output 'untrained_model' of type `<class
'keras.engine.training.Model'>` in step 'initialize_model'. Please make sure to either explicitly set a
materializer for step outputs using `step.with_return_materializers(...)` or registering a default materializer for
specific types by subclassing `BaseMaterializer` and setting its `ASSOCIATED_TYPES` class variable. For more
information, visit https://docs.zenml.io/developer-guide/advanced-usage/materializer.
the function
@step
def initialize_model() -> Output(untrained_model= tf.keras.Model):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
tf.keras.layers.Flatten(),
tf.keras.layers.Dropout(0.5),
tf.keras.layers.Dense(num_classes, activation="softmax"),
])
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc'])
return model
@ahmadmustafaanis what version of zenml are you using? I am able to run the pipeline using the following code using zenml 0.13.2.
import numpy as np
import tensorflow as tf
from zenml.steps import step, Output
num_classes = 10
epochs = 10
@step
def data_loader() -> Output(x_train=np.ndarray, y_train=np.ndarray, x_test=np.ndarray, y_test=np.ndarray):
"""
return MNIST x_train, y_train, x_test, y_test
"""
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
return x_train, y_train, x_test, y_test
@step
def initialize_model() -> Output(untrained_model=tf.keras.Model):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation="relu", input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
tf.keras.layers.Flatten(),
tf.keras.layers.Dropout(0.5),
tf.keras.layers.Dense(num_classes, activation="softmax"),
])
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc'])
return model
@step
def preprocess_data(x:np.ndarray, y:np.ndarray) -> Output(x_processed= np.ndarray, y_processed=np.ndarray):
x_processed = x.astype("float32") / 255
x_processed = np.expand_dims(x_processed, -1)
y_processed = tf.keras.utils.to_categorical(y, num_classes)
return x_processed, y_processed
@step
def train_model(x_train:np.ndarray, y_train:np.ndarray, untrained_model:tf.keras.Model) -> Output(trained_model=tf.keras.Model, history=dict):
history = untrained_model.fit(x_train, y_train, epochs=epochs)
return untrained_model, history.history
@step
def eval_model(trained_model: tf.keras.Model, x_test:np.ndarray, y_test:np.ndarray) -> None:
trained_model.evaluate(x_test, y_test)
from zenml.pipelines import pipeline
@pipeline
def runner(data_loader, initialize_model, preprocess_data, train_model):
x_train, y_train, x_test, y_test = data_loader()
x_train_processed, y_train_processed = preprocess_data(x_train, y_train)
model = initialize_model()
model, history = train_model(x_train_processed, y_train_processed, model)
mnist_pipeline = runner(data_loader(), initialize_model(), preprocess_data(), train_model())
mnist_pipeline.run()
My version is
└[~]> zenml --version
zenml, version 0.13.1
@ahmadmustafaanis I'm able to run @dudeperf3ct code with zenml==0.13.1 and zenml==0.13.0
Did you install tensorflow using pip? Or using zenml integration eg zenml integration install tensorflow?
Yes I installed it via pip, not integrated via zenml.
I guess that's the issue, I'll try with zenml integration this weekend.
@ahmadmustafaanis quite possibly. Using the zenml integration install makes sure the package is installed correctly. Keep us updated here! :))
@dnth Thanks, working like charm now. This means that almost every object, that is a output from a zenml step(or input to), needs to be installed via zenml integration install right?
@ahmadmustafaanis yes, for every object that passes through the pipeline, you need to tell zenml how to serialize/deserialize it. In this way this object can be cached and stored. In zenml this is done using something called a Materializer. For most Python objects you don't need to write your own Materializer, these are already builtin within zenml. But for custom objects you'd need to write a custom materializer for it.
Here's a guide how to write a custom materializer https://docs.zenml.io/v/0.6.0/guides/index/custom-materializer
A Keras model is not a standard Python object so you'd need to write one. But this has been done by someone in zenml and to use it you'll have to install the integration with zenml integration install tensorflow.
@dudeperf3ct any chance this also solves your issue? :)
@dnth i didn't face any issue ... i was helping in resolving the issue
@dudeperf3ct I see.. sorry I misunderstood! Thank you so much for your help!
@Coder-Vishali please let us know if this solves the issue for you :)
@Coder-Vishali I'm closing this issue due to inactivity. Please let us know if the solution provided worked for you. Feel free to reopen if the problem still persists :)