zenml icon indicating copy to clipboard operation
zenml copied to clipboard

[BUG]: StepInterfaceError: Unable to find materializer for output 'output' of type `<class 'keras.engine.training.Model'>` in step 'trainer'.

Open Coder-Vishali opened this issue 3 years ago • 14 comments
trafficstars

Contact Details [Optional]

[email protected]

System Information

ZenML version: 0.13.1 Install path: /home//.pyenv/versions/3.7.6/lib/python3.7/site-packages/zenml Python version: 3.7.6 Platform information: {'os': 'linux', 'linux_distro': 'ubuntu', 'linux_distro_like': 'debian', 'linux_distro_version': '18.04'} Environment: native Integrations: ['kubeflow', 'kubernetes', 'tensorboard', 'tensorflow']

What happened?

When I try to implement this code "https://github.com/zenml-io/zenml/tree/main/examples/kubeflow_pipelines_orchestration"; at the step "Run pipeline": _

Initialize the pipeline

first_pipeline = mnist_pipeline( importer=importer(), normalizer=normalizer(), trainer=trainer(), evaluator=evaluator(), )

first_pipeline.run()

_

When I try to run the above snippet, I face StepInterfaceError

StepInterfaceError: Unable to find materializer for output 'output' of type <class 'keras.engine.training.Model'> in step 'trainer'. Please make sure to either explicitly set a materializer for step outputs using step.with_return_materializers(...) or registering a default materializer for specific types by subclassing BaseMaterializer and setting its ASSOCIATED_TYPES class variable. For more information, visit https://docs.zenml.io/developer-guide/advanced-usage/materializer.

Reproduction steps

You can get the entire source code from here: https://github.com/zenml-io/zenml/tree/main/examples/kubeflow_pipelines_orchestration

Initialize the pipeline

first_pipeline = mnist_pipeline( importer=importer(), normalizer=normalizer(), trainer=trainer(), evaluator=evaluator(), )

first_pipeline.run()

Relevant log output

Creating run for pipeline: mnist_pipeline
Cache enabled for pipeline mnist_pipeline
Using stack default to run pipeline mnist_pipeline...

╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ <ipython-input-19-3e8dc075b972>:9 in <module>                                                    │
│                                                                                                  │
│ /usr/local/lib/python3.7/dist-packages/zenml/pipelines/base_pipeline.py:449 in run               │
│                                                                                                  │
│   446 │   │   constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True                                 │
│   447 │   │   try:                                                                               │
│   448 │   │   │   return_value = stack.deploy_pipeline(                                          │
│ ❱ 449 │   │   │   │   self, runtime_configuration=runtime_configuration                          │
│   450 │   │   │   )                                                                              │
│   451 │   │   finally:                                                                           │
│   452 │   │   │   constants.SHOULD_PREVENT_PIPELINE_EXECUTION = False                            │
│                                                                                                  │
│ /usr/local/lib/python3.7/dist-packages/zenml/stack/stack.py:832 in deploy_pipeline               │
│                                                                                                  │
│   829 │   │   )                                                                                  │
│   830 │   │                                                                                      │
│   831 │   │   return_value = self.orchestrator.run(                                              │
│ ❱ 832 │   │   │   pipeline, stack=self, runtime_configuration=runtime_configuration              │
│   833 │   │   )                                                                                  │
│   834 │   │                                                                                      │
│   835 │   │   # Put pipeline level cache policy back to make sure the next runs                  │
│                                                                                                  │
│ /usr/local/lib/python3.7/dist-packages/zenml/orchestrators/base_orchestrator.py:250 in run       │
│                                                                                                  │
│   247 │   │   # Create the protobuf pipeline which will be needed for various reasons            │
│   248 │   │   # in the following steps                                                           │
│   249 │   │   pb2_pipeline: Pb2Pipeline = Compiler().compile(                                    │
│ ❱ 250 │   │   │   create_tfx_pipeline(pipeline, stack=stack)                                     │
│   251 │   │   )                                                                                  │
│   252 │   │                                                                                      │
│   253 │   │   self._configure_node_context(                                                      │
│                                                                                                  │
│ /usr/local/lib/python3.7/dist-packages/zenml/orchestrators/utils.py:50 in create_tfx_pipeline    │
│                                                                                                  │
│    47 │   │   │   the pipeline.                                                                  │
│    48 │   """                                                                                    │
│    49 │   # Connect the inputs/outputs of all steps in the pipeline                              │
│ ❱  50 │   zenml_pipeline.connect(**zenml_pipeline.steps)                                         │
│    51 │                                                                                          │
│    52 │   tfx_components = {                                                                     │
│    53 │   │   step.name: step.component for step in zenml_pipeline.steps.values()                │
│ <ipython-input-18-3f1c2fa0b83f>:11 in mnist_pipeline                                             │
│                                                                                                  │
│ /usr/local/lib/python3.7/dist-packages/zenml/steps/base_step.py:707 in __call__                  │
│                                                                                                  │
│   704 │   │   }                                                                                  │
│   705 │   │                                                                                      │
│   706 │   │   # make sure we have registered materializers for each output                       │
│ ❱ 707 │   │   materializers = self.get_materializers(ensure_complete=True)                       │
│   708 │   │                                                                                      │
│   709 │   │   # Prepare the output artifacts and spec                                            │
│   710 │   │   for key, value in self.OUTPUT_SIGNATURE.items():                                   │
│                                                                                                  │
│ /usr/local/lib/python3.7/dist-packages/zenml/steps/base_step.py:366 in get_materializers         │
│                                                                                                  │
│   363 │   │   │   │   │   │   f"registering a default materializer for specific "                │
│   364 │   │   │   │   │   │   f"types by subclassing `BaseMaterializer` and setting "            │
│   365 │   │   │   │   │   │   f"its `ASSOCIATED_TYPES` class variable.",                         │
│ ❱ 366 │   │   │   │   │   │   url="https://docs.zenml.io/developer-guide/advanced-usage/materi   │
│   367 │   │   │   │   │   )                                                                      │
│   368 │   │                                                                                      │
│   369 │   │   return materializers                                                               │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
StepInterfaceError: Unable to find materializer for output 'output' of type `<class 'keras.engine.training.Model'>`
in step 'trainer'. Please make sure to either explicitly set a materializer for step outputs using 
`step.with_return_materializers(...)` or registering a default materializer for specific types by subclassing 
`BaseMaterializer` and setting its `ASSOCIATED_TYPES` class variable. For more information, visit 
https://docs.zenml.io/developer-guide/advanced-usage/materializer.

Code of Conduct

  • [X] I agree to follow this project's Code of Conduct

Coder-Vishali avatar Sep 05 '22 05:09 Coder-Vishali

I am getting the same error for my simple code. Here is my code

from zenml.steps import step, Output

@step
def data_loader() -> Output(x_train=np.ndarray, y_train=np.ndarray, x_test=np.ndarray, y_test=np.ndarray):
    """
    return MNIST x_train, y_train, x_test, y_test
    """
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    
    return x_train, y_train, x_test, y_test

@step
def initialize_model() -> Output(untrained_model=tf.keras.Sequential):
    model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(num_classes, activation="softmax"),
            ])
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc'])
    return model

@step
def preprocess_data(x:np.ndarray, y:np.ndarray) ->Output(x_processed= np.ndarray, y_processed=np.ndarray):
    x_processed = x.astype("float32") / 255
    x_processed = np.expand_dims(x_processed, -1)
    
    y_processed = keras.utils.to_categorical(y, num_classes)
    
    return x_processed, y_processed

@step
def train_model(x_train:np.ndarray, y_train:np.ndarray, untrained_model:tf.keras.Sequential) -> Output(trained_model=tf.keras.Sequential, history=dict):
    history = untrained_model.fit(x_train, y_train, epochs=epochs)
    return untrained_model, history 

@step
def eval_model(trained_model: tf.keras.Sequential, x_test:np.ndarray, y_test:np.ndarray) -> None:
    trainedmodel.evaluate(x_test, y_test)

from zenml.pipelines import pipeline

@pipeline
def runner(data_loader, initialize_model, preprocess_data, train_model):
    x_train, y_train, x_test, y_test = data_loader()
    x_train_processed, y_train_processed = preprocess_data(x_train, y_train)    
    model = initialize_model()
    model, history = train_model(x_train_processed, y_train_processed, model)
    
mnist_pipeline = runner(data_loader(), initialize_model(), preprocess_data(), train_model())

mnist_pipeline.run()

The error is

Creating run for pipeline: runner
Cache enabled for pipeline runner
Using stack default to run pipeline runner...
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /tmp/ipykernel_26594/730396930.py:1 in <cell line: 1>                                            │
│                                                                                                  │
│ [Errno 2] No such file or directory: '/tmp/ipykernel_26594/730396930.py'                         │
│                                                                                                  │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/pipelines/base_pipeline.py:44 │
│ 8 in run                                                                                         │
│                                                                                                  │
│   445 │   │   # behavior                                                                         │
│   446 │   │   constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True                                 │
│   447 │   │   try:                                                                               │
│ ❱ 448 │   │   │   return_value = stack.deploy_pipeline(                                          │
│   449 │   │   │   │   self, runtime_configuration=runtime_configuration                          │
│   450 │   │   │   )                                                                              │
│   451 │   │   finally:                                                                           │
│                                                                                                  │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/stack/stack.py:831 in         │
│ deploy_pipeline                                                                                  │
│                                                                                                  │
│   828 │   │   │   pipeline=pipeline, runtime_configuration=runtime_configuration                 │
│   829 │   │   )                                                                                  │
│   830 │   │                                                                                      │
│ ❱ 831 │   │   return_value = self.orchestrator.run(                                              │
│   832 │   │   │   pipeline, stack=self, runtime_configuration=runtime_configuration              │
│   833 │   │   )                                                                                  │
│   834                                                                                            │
│                                                                                                  │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/orchestrators/base_orchestrat │
│ or.py:250 in run                                                                                 │
│                                                                                                  │
│   247 │   │   # Create the protobuf pipeline which will be needed for various reasons            │
│   248 │   │   # in the following steps                                                           │
│   249 │   │   pb2_pipeline: Pb2Pipeline = Compiler().compile(                                    │
│ ❱ 250 │   │   │   create_tfx_pipeline(pipeline, stack=stack)                                     │
│   251 │   │   )                                                                                  │
│   252 │   │                                                                                      │
│   253 │   │   self._configure_node_context(                                                      │
│                                                                                                  │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/orchestrators/utils.py:50 in  │
│ create_tfx_pipeline                                                                              │
│                                                                                                  │
│    47 │   │   │   the pipeline.                                                                  │
│    48 │   """                                                                                    │
│    49 │   # Connect the inputs/outputs of all steps in the pipeline                              │
│ ❱  50 │   zenml_pipeline.connect(**zenml_pipeline.steps)                                         │
│    51 │                                                                                          │
│    52 │   tfx_components = {                                                                     │
│    53 │   │   step.name: step.component for step in zenml_pipeline.steps.values()                │
│                                                                                                  │
│ /tmp/ipykernel_26594/100900222.py:7 in runner                                                    │
│                                                                                                  │
│ [Errno 2] No such file or directory: '/tmp/ipykernel_26594/100900222.py'                         │
│                                                                                                  │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/steps/base_step.py:707 in     │
│ __call__                                                                                         │
│                                                                                                  │
│   704 │   │   }                                                                                  │
│   705 │   │                                                                                      │
│   706 │   │   # make sure we have registered materializers for each output                       │
│ ❱ 707 │   │   materializers = self.get_materializers(ensure_complete=True)                       │
│   708 │   │                                                                                      │
│   709 │   │   # Prepare the output artifacts and spec                                            │
│   710 │   │   for key, value in self.OUTPUT_SIGNATURE.items():                                   │
│                                                                                                  │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/steps/base_step.py:357 in     │
│ get_materializers                                                                                │
│                                                                                                  │
│   354 │   │   │   │   materializers[output_name] = materializer                                  │
│   355 │   │   │   else:                                                                          │
│   356 │   │   │   │   if ensure_complete:                                                        │
│ ❱ 357 │   │   │   │   │   raise StepInterfaceError(                                              │
│   358 │   │   │   │   │   │   f"Unable to find materializer for output "                         │
│   359 │   │   │   │   │   │   f"'{output_name}' of type `{output_type}` in step "                │
│   360 │   │   │   │   │   │   f"'{self.name}'. Please make sure to either "                      │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
StepInterfaceError: Unable to find materializer for output 'untrained_model' of type `<class 
'keras.engine.sequential.Sequential'>` in step 'initialize_model'. Please make sure to either explicitly set a 
materializer for step outputs using `step.with_return_materializers(...)` or registering a default materializer for
specific types by subclassing `BaseMaterializer` and setting its `ASSOCIATED_TYPES` class variable. For more 
information, visit https://docs.zenml.io/developer-guide/advanced-usage/materializer.

ahmadmustafaanis avatar Sep 08 '22 10:09 ahmadmustafaanis

@ahmadmustafaanis the error you are getting is because there's no materializer written for type <class 'keras.engine.sequential.Sequential'>.

The code should run if you replace tf.keras.Sequential with tf.keras.Model since Sequential inherits from Model class.

dudeperf3ct avatar Sep 09 '22 06:09 dudeperf3ct

@dudeperf3ct I am still getting this error.

StepInterfaceError: Unable to find materializer for output 'untrained_model' of type `<class 
'keras.engine.training.Model'>` in step 'initialize_model'. Please make sure to either explicitly set a 
materializer for step outputs using `step.with_return_materializers(...)` or registering a default materializer for
specific types by subclassing `BaseMaterializer` and setting its `ASSOCIATED_TYPES` class variable. For more 
information, visit https://docs.zenml.io/developer-guide/advanced-usage/materializer.

the function

@step
def initialize_model() -> Output(untrained_model= tf.keras.Model):
    model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(num_classes, activation="softmax"),
            ])
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc'])
    return model

ahmadmustafaanis avatar Sep 10 '22 15:09 ahmadmustafaanis

@ahmadmustafaanis what version of zenml are you using? I am able to run the pipeline using the following code using zenml 0.13.2.

import numpy as np
import tensorflow as tf

from zenml.steps import step, Output

num_classes = 10
epochs = 10

@step
def data_loader() -> Output(x_train=np.ndarray, y_train=np.ndarray, x_test=np.ndarray, y_test=np.ndarray):
    """
    return MNIST x_train, y_train, x_test, y_test
    """
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    
    return x_train, y_train, x_test, y_test

@step
def initialize_model() -> Output(untrained_model=tf.keras.Model):
    model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation="relu", input_shape=(28, 28, 1)),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(num_classes, activation="softmax"),
            ])
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc'])
    return model

@step
def preprocess_data(x:np.ndarray, y:np.ndarray) -> Output(x_processed= np.ndarray, y_processed=np.ndarray):
    x_processed = x.astype("float32") / 255
    x_processed = np.expand_dims(x_processed, -1)
    
    y_processed = tf.keras.utils.to_categorical(y, num_classes)
    
    return x_processed, y_processed

@step
def train_model(x_train:np.ndarray, y_train:np.ndarray, untrained_model:tf.keras.Model) -> Output(trained_model=tf.keras.Model, history=dict):
    history = untrained_model.fit(x_train, y_train, epochs=epochs)
    return untrained_model, history.history

@step
def eval_model(trained_model: tf.keras.Model, x_test:np.ndarray, y_test:np.ndarray) -> None:
    trained_model.evaluate(x_test, y_test)

from zenml.pipelines import pipeline

@pipeline
def runner(data_loader, initialize_model, preprocess_data, train_model):
    x_train, y_train, x_test, y_test = data_loader()
    x_train_processed, y_train_processed = preprocess_data(x_train, y_train)    
    model = initialize_model()
    model, history = train_model(x_train_processed, y_train_processed, model)
    
mnist_pipeline = runner(data_loader(), initialize_model(), preprocess_data(), train_model())

mnist_pipeline.run()

dudeperf3ct avatar Sep 15 '22 06:09 dudeperf3ct

My version is

└[~]> zenml --version
zenml, version 0.13.1

ahmadmustafaanis avatar Sep 15 '22 07:09 ahmadmustafaanis

@ahmadmustafaanis I'm able to run @dudeperf3ct code with zenml==0.13.1 and zenml==0.13.0

Did you install tensorflow using pip? Or using zenml integration eg zenml integration install tensorflow?

dnth avatar Sep 16 '22 06:09 dnth

Yes I installed it via pip, not integrated via zenml.

I guess that's the issue, I'll try with zenml integration this weekend.

ahmadmustafaanis avatar Sep 16 '22 07:09 ahmadmustafaanis

@ahmadmustafaanis quite possibly. Using the zenml integration install makes sure the package is installed correctly. Keep us updated here! :))

dnth avatar Sep 16 '22 08:09 dnth

@dnth Thanks, working like charm now. This means that almost every object, that is a output from a zenml step(or input to), needs to be installed via zenml integration install right?

ahmadmustafaanis avatar Sep 17 '22 18:09 ahmadmustafaanis

@ahmadmustafaanis yes, for every object that passes through the pipeline, you need to tell zenml how to serialize/deserialize it. In this way this object can be cached and stored. In zenml this is done using something called a Materializer. For most Python objects you don't need to write your own Materializer, these are already builtin within zenml. But for custom objects you'd need to write a custom materializer for it.

Here's a guide how to write a custom materializer https://docs.zenml.io/v/0.6.0/guides/index/custom-materializer

A Keras model is not a standard Python object so you'd need to write one. But this has been done by someone in zenml and to use it you'll have to install the integration with zenml integration install tensorflow.

dnth avatar Sep 19 '22 07:09 dnth

@dudeperf3ct any chance this also solves your issue? :)

dnth avatar Sep 19 '22 07:09 dnth

@dnth i didn't face any issue ... i was helping in resolving the issue

dudeperf3ct avatar Sep 19 '22 07:09 dudeperf3ct

@dudeperf3ct I see.. sorry I misunderstood! Thank you so much for your help!

dnth avatar Sep 19 '22 07:09 dnth

@Coder-Vishali please let us know if this solves the issue for you :)

dnth avatar Sep 19 '22 07:09 dnth

@Coder-Vishali I'm closing this issue due to inactivity. Please let us know if the solution provided worked for you. Feel free to reopen if the problem still persists :)

dnth avatar Sep 28 '22 06:09 dnth