SynapseML icon indicating copy to clipboard operation
SynapseML copied to clipboard

Unable to save the complete pipeline to disk

Open bipin2295 opened this issue 3 years ago • 8 comments

Describe the bug I'm trying to write the complete ML pipeline including StringIndexer, VectorAssembler and LightGBMRegressor to the disk using pipeline_model.write().overwrite().save("model_file"), but I'm unable to write to disk.

To Reproduce

from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMRegressor
from pyspark.ml import Pipeline, PipelineModel

spark = SparkSession.builder.appName('Regression') \
    .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.9.4") \
    .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
    .getOrCreate()

df = spark.read.option('header', 'true').csv('tips.csv',inferSchema=True)
indexer = StringIndexer(inputCols=["sex", "smoker", "day", "time"],outputCols=["sex_indexed", "smoker_indexed", "day_indexed", "time_indexed"])
featureAssembler = VectorAssembler(inputCols=['tip', 'size', 'sex_indexed', 'smoker_indexed', 'day_indexed', 'time_indexed'], outputCol='Independent Features')

reggressor = LightGBMRegressor() \
    .setLabelCol('total_bill') \
    .setFeaturesCol('Independent Features') \
    .setNumIterations(10) \
    .setNumLeaves(10) \
    .setMaxDepth(10) \
    .setLearningRate(0.1) \
    .setVerbosity(-1) \
    .setBaggingSeed(100)


pipeline = Pipeline(stages=[indexer, featureAssembler, reggressor])
model = pipeline.fit(df)

model.write().overwrite().save("model_file")

Expected behavior A clear and concise description of what you expected to happen.

Info (please complete the following information):

  • SynapseML Version: 0.9.4
  • Spark Version: pyspark 3.2.0
  • Spark Platform [e.g. Databricks]: standalone mode

** Stacktrace**

Py4JJavaError: An error occurred while calling o417.save.
: java.lang.NoSuchMethodError: org.json4s.JsonDSL$.pair2Assoc(Lscala/Tuple2;Lscala/Function1;)Lorg/json4s/JsonDSL$JsonAssoc;
	at org.apache.spark.ml.ComplexParamsWriter$.getMetadataToSave(ComplexParamsSerializer.scala:122)
	at org.apache.spark.ml.ComplexParamsWriter$.saveMetadata(ComplexParamsSerializer.scala:97)
	at org.apache.spark.ml.ComplexParamsWriter.saveImpl(ComplexParamsSerializer.scala:40)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:168)
	at org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$saveImpl$5(Pipeline.scala:257)
	at org.apache.spark.ml.MLEvents.withSaveInstanceEvent(events.scala:174)
	at org.apache.spark.ml.MLEvents.withSaveInstanceEvent$(events.scala:169)
	at org.apache.spark.ml.util.Instrumentation.withSaveInstanceEvent(Instrumentation.scala:42)
	at org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$saveImpl$4(Pipeline.scala:257)
	at org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$saveImpl$4$adapted(Pipeline.scala:254)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$saveImpl$1(Pipeline.scala:254)
	at org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$saveImpl$1$adapted(Pipeline.scala:247)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.Pipeline$SharedReadWrite$.saveImpl(Pipeline.scala:247)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.saveImpl(Pipeline.scala:346)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:168)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.super$save(Pipeline.scala:344)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$anonfun$save$4(Pipeline.scala:344)
	at org.apache.spark.ml.MLEvents.withSaveInstanceEvent(events.scala:174)
	at org.apache.spark.ml.MLEvents.withSaveInstanceEvent$(events.scala:169)
	at org.apache.spark.ml.util.Instrumentation.withSaveInstanceEvent(Instrumentation.scala:42)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$anonfun$save$3(Pipeline.scala:344)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$anonfun$save$3$adapted(Pipeline.scala:344)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.save(Pipeline.scala:344)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)

If the bug pertains to a specific feature please tag the appropriate CODEOWNER for better visibility

Additional context Add any other context about the problem here.

bipin2295 avatar Dec 08 '21 07:12 bipin2295

@imatiach-msft tagging you here for LightGBM related issue.

bipin2295 avatar Dec 08 '21 07:12 bipin2295

@bipin2295 i think this could be caused by your dataset. Could you share a minimal sample of it so we can repro on our side?

mhamilton723 avatar Dec 15 '21 19:12 mhamilton723

@mhamilton723 , very sorry for such a late reply, these are the few sample records..

total_bill,tip,sex,smoker,day,time,size
16.99,1.01,Female,No,Sun,Dinner,2
10.34,1.66,Male,No,Sun,Dinner,3
21.01,3.5,Male,No,Sun,Dinner,3
23.68,3.31,Male,No,Sun,Dinner,2
24.59,3.61,Female,No,Sun,Dinner,4
25.29,4.71,Male,No,Sun,Dinner,4
8.77,2.0,Male,No,Sun,Dinner,2
26.88,3.12,Male,No,Sun,Dinner,4
15.04,1.96,Male,No,Sun,Dinner,2
14.78,3.23,Male,No,Sun,Dinner,2
10.27,1.71,Male,No,Sun,Dinner,2
35.26,5.0,Female,No,Sun,Dinner,4
15.42,1.57,Male,No,Sun,Dinner,2
18.43,3.0,Male,No,Sun,Dinner,4

bipin2295 avatar Apr 04 '22 04:04 bipin2295

Me and my colleague have the same problem. That's how i run LightGBM:

from synapse.ml.lightgbm import LightGBMClassifier
import pyspark

def _prepare_valid(self, train: pyspark.sql.DataFrame, valid: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    train_ = train.withColumn("validation", func.lit(0))
    valid_ = valid.withColumn("validation", func.lit(1))
    return train_.union(valid_)
 
def to_vector(df: DataFrame, used_features: list, output_col: str = "features") -> DataFrame:
    featurizer = VectorAssembler(
        inputCols=used_features,
        outputCol=output_col,
        handleInvalid="keep"
    )
    return featurizer.transform(df)
 
 
train = pyspark.sql.DataFrame(train_data) 
valid = pyspark.sql.DataFrame(valid_data)
used_features = [“x1”, “x2”, “x3”]
target_name = “y”
 
model_params = {
    'boostingType': 'gbdt',
    'numIterations': 1000,
    'learningRate': 0.05134,
    'numLeaves': 54,
    'maxDepth': 10,
    'isUnbalance': False,
    'objective': 'binary',
    "useSingleDatasetMode": True,
    "baggingSeed": 42,
    "useBarrierExecutionMode": True,
    "numThreads": 9,  # num cores - 1
    "parallelism": "voting_parallel",
    "featuresShapCol": "shap_values",
    "verbosity": 500
}
 
 
train_vector = to_vector(train,used_features)[target_name, 'features']
if valid:
    dataset = _prepare_valid(train, valid)
    train_vector = to_vector(dataset,used_features)[target_name, "features", "validation"]
    model_params["validationIndicatorCol"] = "validation"
estimator = LightGBMClassifier(**model_params)
model = estimator.fit(train_vector)
model.write().save(save_path)

SynapseML Version: 0.9.5 Spark Version: pyspark 3.0.1 Spark Platform [e.g. Databricks]: custom hadoop cluster

trover97 avatar Apr 04 '22 12:04 trover97

Same issue, using com.microsoft.azure:synapseml-lightgbm_2.12:0.9.5 with Spark 3.0.1

Ishitori avatar Apr 29 '22 02:04 Ishitori

Facing the same issue. Could you guys please fix this?

ramab1988 avatar May 24 '22 06:05 ramab1988

Facing the same issue. Could you guys please fix this?

wang21jun avatar Oct 12 '22 12:10 wang21jun

Facing the same issue. Could you guys please fix this?

zjyuwish avatar Apr 06 '23 07:04 zjyuwish