spark-deep-learning
spark-deep-learning copied to clipboard
Save a pyspark ml pipeline model
Hi,
I am trying to save a model for future use, belowed are the code:
from pyspark.ml.image import ImageSchema
from pyspark.sql.functions import *
img_dir = "hdfs:///personalities"
jobs_df = ImageSchema.readImages(img_dir + "/jobs").withColumn("label", lit(1))
zuckerberg_df = ImageSchema.readImages(img_dir + "/zuckerberg").withColumn("label", lit(0))
jobs_train, jobs_test = jobs_df.randomSplit([0.6, 0.4]) #0.6 for training, 0.4 for testing
zuckerberg_train, zuckerberg_test = zuckerberg_df.randomSplit([0.6, 0.4])
train_df = jobs_train.unionAll(zuckerberg_train)
test_df = jobs_test.unionAll(zuckerberg_test)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer
featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")
lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label")
p = Pipeline(stages=[featurizer, lr])
p_model = p.fit(train_df)
p_model.save("hdfs:///ml_model")
However, I get the following errer:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/spark-2.4.0-bin-hadoop2.7/python/pyspark/ml/util.py", line 244, in save
self.write().save(path)
File "/opt/spark-2.4.0-bin-hadoop2.7/python/pyspark/ml/util.py", line 136, in save
self.saveImpl(path)
File "/opt/spark-2.4.0-bin-hadoop2.7/python/pyspark/ml/pipeline.py", line 225, in saveImpl
PipelineSharedReadWrite.validateStages(stages)
File "/opt/spark-2.4.0-bin-hadoop2.7/python/pyspark/ml/pipeline.py", line 348, in validateStages
stage.uid, type(stage))
ValueError: ('Pipeline write will fail on this pipeline because stage %s of type %s is not MLWritable', 'DeepImageFeaturizer_bbeb5c2d479e', <class 'sparkdl.transformers.named_image.DeepImageFeaturizer'>)
I find a similar issue here but we have a different error. Can any one give me some help?
It seems that stages[1] LogisticRegressionModel can be saved, while stages[0] DeepImageFeaturizer can not be saved. Pipeline model p_model contains both of these two stages, therefore, it can not be saved.
>>> print(type(p_model.stages[0]))
<class 'sparkdl.transformers.named_image.DeepImageFeaturizer'>
>>> print(type(p_model.stages[1]))
<class 'pyspark.ml.classification.LogisticRegressionModel'>
According to Source code for pyspark.ml.pipeline, when read/write is performed, function validateStages will be called to check whether every stage inside the pipeline model is instance of MLWritable.
@staticmethod
def validateStages(stages):
"""
Check that all stages are Writable
"""
for stage in stages:
if not isinstance(stage, MLWritable):
raise ValueError("Pipeline write will fail on this pipeline " +
"because stage %s of type %s is not MLWritable",
stage.uid, type(stage))