elephas
elephas copied to clipboard
Pipeline transform returns an empty prediction column
Hi everyone,
i'have a problem with the method transform. The keras model translate in elephas fit in a pyspark correctly but when i try to transform the validation set, it returns an empty dataframe where the column prediction is empty.
The idea:
- Transform to preprocessing iris dataset with pyspark pipeline ( first pipeline)
- Translate keras model in elephas
- Build a new pipeline to fit the model
- Fit the pipeline on the train data preprocessed
- Transform the validation set
My code:
# LOAD DATA
iris = load_iris()
data = pd.DataFrame(data=np.c_[iris['data'], iris['target']],
columns=iris['feature_names'] + ['target'])
data_spark = spark.createDataFrame(data)
validation = data_spark.limit(20)
data_spark = data_spark.subtract(validation)
# # PREPROCESSING
stages, features = [], []
for feature in columns:
if feature != "target":
stages.append(VectorAssembler(
inputCols=[feature], outputCol=f"VECT_{feature}"))
stages.append(MinMaxScaler(inputCol=f"VECT_{feature}",
outputCol=f'SCALED_{feature}'))
features.append(f'SCALED_{feature}')
stages += [VectorAssembler(inputCols=features, outputCol=f"features")]
stages += [StringIndexer(inputCol="target", outputCol="index_category")]
pre_processing_pipeline = Pipeline(stages=stages)
pipeline_fitted = pre_processing_pipeline.fit(data_spark)
preprocessed_data = pipeline_fitted.transform(data_spark)
val_fitted = pre_processing_pipeline.fit(validation)
pre_processing_val = val_fitted.transform(validation)
nb_classes = preprocessed_data.select("target").distinct().count()
input_dim = len(preprocessed_data.select("features").first()[0])
# BUILD MODEL
model = build_model(input_dim, nb_classes)
opt = tf.optimizers.Adam(learning_rate=0.0001)
model.compile(loss="categorical_crossentropy",
optimizer=opt, metrics=["accuracy"])
opt_conf = tf.optimizers.serialize(opt)
# ELEPHAS MODEL TRANSLATE
estimator = ElephasEstimator()
estimator.setFeaturesCol("features")
estimator.setLabelCol("index_category")
estimator.set_keras_model_config(model.to_json())
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)
estimator.set_num_workers(workers)
estimator.set_epochs(epochs)
estimator.set_batch_size(batch_size)
estimator.set_verbosity(1)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode("synchronous")
estimator.set_loss("categorical_crossentropy")
estimator.set_metrics(['acc'])
# FIT ELEPHAS MODEL
model_pipeline = Pipeline(stages=[estimator])
print(model_pipeline)
preprocessed_data = preprocessed_data.select(
"features", "index_category")
val_data = pre_processing_val.select("features", "index_category")
fitted_model_pipeline = model_pipeline.fit(preprocessed_data)
prediction = fitted_model_pipeline.transform(val_data)
print(type(prediction))
print(type(prediction.describe('prediction')))
prediction.describe('prediction').show()
Prediction, using the describe method is empty... but the procedure seems correct, why?