elephas icon indicating copy to clipboard operation
elephas copied to clipboard

Pipeline transform returns an empty prediction column

Open AlessandroMinervini opened this issue 1 year ago • 0 comments

Hi everyone,

i'have a problem with the method transform. The keras model translate in elephas fit in a pyspark correctly but when i try to transform the validation set, it returns an empty dataframe where the column prediction is empty.

The idea:

  • Transform to preprocessing iris dataset with pyspark pipeline ( first pipeline)
  • Translate keras model in elephas
  • Build a new pipeline to fit the model
  • Fit the pipeline on the train data preprocessed
  • Transform the validation set

My code:

# LOAD DATA
iris = load_iris()
data = pd.DataFrame(data=np.c_[iris['data'], iris['target']],
                    columns=iris['feature_names'] + ['target'])

data_spark = spark.createDataFrame(data)

validation = data_spark.limit(20)
data_spark = data_spark.subtract(validation)

# # PREPROCESSING
stages, features = [], []
for feature in columns:
    if feature != "target":
        stages.append(VectorAssembler(
            inputCols=[feature], outputCol=f"VECT_{feature}"))
        stages.append(MinMaxScaler(inputCol=f"VECT_{feature}",
                                   outputCol=f'SCALED_{feature}'))
        features.append(f'SCALED_{feature}')

stages += [VectorAssembler(inputCols=features, outputCol=f"features")]
stages += [StringIndexer(inputCol="target", outputCol="index_category")]

pre_processing_pipeline = Pipeline(stages=stages)

pipeline_fitted = pre_processing_pipeline.fit(data_spark)

preprocessed_data = pipeline_fitted.transform(data_spark)

val_fitted = pre_processing_pipeline.fit(validation)
pre_processing_val = val_fitted.transform(validation)

nb_classes = preprocessed_data.select("target").distinct().count()
input_dim = len(preprocessed_data.select("features").first()[0])

# BUILD MODEL
model = build_model(input_dim, nb_classes)
opt = tf.optimizers.Adam(learning_rate=0.0001)

model.compile(loss="categorical_crossentropy",
              optimizer=opt, metrics=["accuracy"])

opt_conf = tf.optimizers.serialize(opt)

# ELEPHAS MODEL TRANSLATE
estimator = ElephasEstimator()
estimator.setFeaturesCol("features")
estimator.setLabelCol("index_category")
estimator.set_keras_model_config(model.to_json())
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)
estimator.set_num_workers(workers)
estimator.set_epochs(epochs)
estimator.set_batch_size(batch_size)
estimator.set_verbosity(1)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode("synchronous")
estimator.set_loss("categorical_crossentropy")
estimator.set_metrics(['acc'])

# FIT ELEPHAS MODEL
model_pipeline = Pipeline(stages=[estimator])
print(model_pipeline)

preprocessed_data = preprocessed_data.select(
    "features", "index_category")

val_data = pre_processing_val.select("features", "index_category")

fitted_model_pipeline = model_pipeline.fit(preprocessed_data)

prediction = fitted_model_pipeline.transform(val_data)

print(type(prediction))
print(type(prediction.describe('prediction')))
prediction.describe('prediction').show()

Prediction, using the describe method is empty... but the procedure seems correct, why?

AlessandroMinervini avatar Aug 05 '22 09:08 AlessandroMinervini