Spark-The-Definitive-Guide
Spark-The-Definitive-Guide copied to clipboard
Multi-line chain of Dataframe transformations needs to be enclosed in parenthesis
At least for spark-shell the Spark syntax needs this fixing. Without the parenthesis, the following command on page 421 of the book fails to generate proper Array[...] into the params Dataframe:
val params = new ParamGridBuilder() .addGrid(rForm.formula, Array( ...
Hello @dbtech4 i would like to work on this issue, can you please give some more context to me? Thank you!!
Hello Vaibhav, Please find attached the failed case and fixed version
Best regards Martti
On Mon, 12 Aug 2024 at 11:03, Vaibhav Dixit @.***> wrote:
Hello @dbtech4 https://github.com/dbtech4 i would like to work on this issue, can you please give some more context to me? Thank you!!
— Reply to this email directly, view it on GitHub https://github.com/databricks/Spark-The-Definitive-Guide/issues/71#issuecomment-2283333533, or unsubscribe https://github.com/notifications/unsubscribe-auth/APAHAGIVZIL6OFYG637TA6TZRBT6XAVCNFSM6AAAAABMLUOVCKVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDEOBTGMZTGNJTGM . You are receiving this because you were mentioned.Message ID: @.***>
Testing the Machine Learning example in Chapter 24 of the book “Spark: The Definitive Guide” by Bill Chambers & Matei Zaharia First Edition, Second Release 2018-03-16
Using Spark 3.3.1 installed as standalone in Debian 10 Linux platform
@.:~$ cd Spark @.:~/Spark$ spark-shell Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://debian10.local:4041 Spark context available as 'sc' (master = local[*], app id = local-1674503039164). Spark session available as 'spark'. Welcome to ____ __ / / ___ / / \ / _ / _ `/ __/ '/ // .__/_,// //_\ version 3.3.1 //
Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_202) Type in expressions to have them evaluated. Type :help for more information.
scala> // MLlib in Action pp 414-
...
scala> // Training and Evaluation
scala> import org.apache.spark.ml.tuning.ParamGridBuilder import org.apache.spark.ml.tuning.ParamGridBuilder
scala> val params = new ParamGridBuilder() // p 421 params: org.apache.spark.ml.tuning.ParamGridBuilder = @.***
scala> .addGrid(rForm.formula, Array( | "lab ~ . + color:value1", | "lab ~ . + color:value1 + color:value2")) res4: params.type = @.***
scala> .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0)) res5: res4.type = @.***
scala> .addGrid(lr.regParam, Array(0.1, 2.0)) res6: res5.type = @.***
scala> .build() res7: Array[org.apache.spark.ml.param.ParamMap] = Array({ logreg_2e78cc49ddce-elasticNetParam: 0.0, rFormula_0aa1e5704a94-formula: lab ~ . + color:value1, logreg_2e78cc49ddce-regParam: 0.1 }, { logreg_2e78cc49ddce-elasticNetParam: 0.0, rFormula_0aa1e5704a94-formula: lab ~ . + color:value1, logreg_2e78cc49ddce-regParam: 2.0 }, { logreg_2e78cc49ddce-elasticNetParam: 0.5, rFormula_0aa1e5704a94-formula: lab ~ . + color:value1, logreg_2e78cc49ddce-regParam: 0.1 }, { logreg_2e78cc49ddce-elasticNetParam: 0.5, rFormula_0aa1e5704a94-formula: lab ~ . + color:value1, logreg_2e78cc49ddce-regParam: 2.0 }, { logreg_2e78cc49ddce-elasticNetParam: 1.0, rFormula_0aa1e5704a94-formula: lab ~ . + color:value1, logreg_2e78cc49ddce-regParam: 0.1 }, { logreg_2e78cc49dd...
scala> import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
scala> val evaluator = new BinaryClassificationEvaluator() evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = BinaryClassificationEvaluator: uid=binEval_f8d96ea0bae3, metricName=areaUnderROC, numBins=1000
scala> .setMetricName("areaUnderROC") res8: evaluator.type = BinaryClassificationEvaluator: uid=binEval_f8d96ea0bae3, metricName=areaUnderROC, numBins=1000
scala> .setRawPredictionCol("prediction") res9: res8.type = BinaryClassificationEvaluator: uid=binEval_f8d96ea0bae3, metricName=areaUnderROC, numBins=1000
scala> .setLabelCol("label") res10: res9.type = BinaryClassificationEvaluator: uid=binEval_f8d96ea0bae3, metricName=areaUnderROC, numBins=1000
scala> //
scala> import org.apache.spark.ml.tuning.TrainValidationSplit import org.apache.spark.ml.tuning.TrainValidationSplit
scala> val tvs = new TrainValidationSplit() // p 422 tvs: org.apache.spark.ml.tuning.TrainValidationSplit = tvs_9ef672b9a227
scala> .setTrainRatio(0.75) // also the default. res11: tvs.type = tvs_9ef672b9a227
scala> .setEstimatorParamMaps(params)
scala> .setEstimator(pipeline) res13: res11.type = tvs_9ef672b9a227
scala> .setEvaluator(evaluator) res14: res13.type = tvs_9ef672b9a227
scala> // p 423
scala> val tvsFitted = tvs.fit(train) 23/01/23 19:45:57 ERROR Instrumentation: java.util.NoSuchElementException: Failed to find a default value for estimatorParamMaps at org.apache.spark.ml.param.Params.$anonfun$getOrDefault$2(params.scala:756) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.ml.param.Params.getOrDefault(params.scala:756) at org.apache.spark.ml.param.Params.getOrDefault$(params.scala:753) at org.apache.spark.ml.PipelineStage.getOrDefault(Pipeline.scala:41) at org.apache.spark.ml.param.Params.$(params.scala:762) at org.apache.spark.ml.param.Params.$$(params.scala:762) at org.apache.spark.ml.PipelineStage.$(Pipeline.scala:41) @.**:~/Spark$ spark-shell Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://debian10.local:4042 Spark context available as 'sc' (master = local[], app id = local-1675586548395). Spark session available as 'spark'. Welcome to ____ __ / / ___ / / \ / _ / _ `/ __/ '/ // .__/_,// //_\ version 3.3.1 //
Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_202) Type in expressions to have them evaluated. Type :help for more information. ...
scala> // Pipelining Our Workflow
scala> // p 420
scala> val Array(train, test) = df.randomSplit(Array(0.7, 0.3)) train: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [color: string, lab: string ... 2 more fields] test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [color: string, lab: string ... 2 more fields]
scala> val rForm = new RFormula() rForm: org.apache.spark.ml.feature.RFormula = RFormula: uid=rFormula_5d2f990bdd05
scala> val lr = new LogisticRegression().setLabelCol("label").setFeaturesCol("features") lr: org.apache.spark.ml.classification.LogisticRegression = logreg_c929337cd6f4
scala> // p 421
scala> import org.apache.spark.ml.Pipeline import org.apache.spark.ml.Pipeline
scala> val stages = Array(rForm, lr) stages: Array[org.apache.spark.ml.Estimator[_ >: org.apache.spark.ml.classification.LogisticRegressionModel with org.apache.spark.ml.feature.RFormulaModel <: org.apache.spark.ml.Model[_ >: org.apache.spark.ml.classification.LogisticRegressionModel with org.apache.spark.ml.feature.RFormulaModel <: org.apache.spark.ml.Transformer with org.apache.spark.ml.param.shared.HasLabelCol with org.apache.spark.ml.param.shared.HasFeaturesCol with org.apache.spark.ml.util.MLWritable] with org.apache.spark.ml.param.shared.HasLabelCol with org.apache.spark.ml.param.shared.HasFeaturesCol with org.apache.spark.ml.util.MLWritable] with org.apache.spark.ml.param.shared.HasLabelCol with org.apache.spark.ml.param.shared.HasFeaturesCol with org.apache.spark.ml.util.DefaultParamsWritable{...
scala> val pipeline = new Pipeline().setStages(stages) pipeline: org.apache.spark.ml.Pipeline = pipeline_ef9fc1ede161
scala> // Training and Evaluation:
scala> import org.apache.spark.ml.tuning.ParamGridBuilder import org.apache.spark.ml.tuning.ParamGridBuilder
scala> val params = (new ParamGridBuilder() | .addGrid(rForm.formula, Array( | "lab ~ . + color:value1", | "lab ~ . + color:value1 + color:value2")) | .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0)) | .addGrid(lr.regParam, Array(0.1, 2.0)) | .build() | ) params: Array[org.apache.spark.ml.param.ParamMap] = Array({ logreg_c929337cd6f4-elasticNetParam: 0.0, rFormula_5d2f990bdd05-formula: lab ~ . + color:value1, logreg_c929337cd6f4-regParam: 0.1 }, { logreg_c929337cd6f4-elasticNetParam: 0.5, rFormula_5d2f990bdd05-formula: lab ~ . + color:value1, logreg_c929337cd6f4-regParam: 0.1 }, { logreg_c929337cd6f4-elasticNetParam: 1.0, rFormula_5d2f990bdd05-formula: lab ~ . + color:value1, logreg_c929337cd6f4-regParam: 0.1 }, { logreg_c929337cd6f4-elasticNetParam: 0.0, rFormula_5d2f990bdd05-formula: lab ~ . + color:value1, logreg_c929337cd6f4-regParam: 2.0 }, { logreg_c929337cd6f4-elasticNetParam: 0.5, rFormula_5d2f990bdd05-formula: lab ~ . + color:value1, logreg_c929337cd6f4-regParam: 2.0 }, { logreg_c929337c...
scala> // p 422
scala> import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
scala> val evaluator = (new BinaryClassificationEvaluator() | .setMetricName("areaUnderROC") | .setRawPredictionCol("prediction") | .setLabelCol("label") | ) evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = BinaryClassificationEvaluator: uid=binEval_38d30199ac9a, metricName=areaUnderROC, numBins=1000
scala> import org.apache.spark.ml.tuning.TrainValidationSplit import org.apache.spark.ml.tuning.TrainValidationSplit
scala> val tvs = (new TrainValidationSplit() | .setTrainRatio(0.75) // also the default. | .setEstimatorParamMaps(params) // error: type mismatch | .setEstimator(pipeline) | .setEvaluator(evaluator) | ) tvs: org.apache.spark.ml.tuning.TrainValidationSplit = tvs_ea9c544d061a
scala> // p 423
scala> val tvsFitted = tvs.fit(train) tvsFitted: org.apache.spark.ml.tuning.TrainValidationSplitModel = TrainValidationSplitModel: uid=tvs_ea9c544d061a, bestModel=pipeline_ef9fc1ede161, trainRatio=0.75
scala> evaluator.evaluate(tvsFitted.transform(test)) // 0.9166666666666667 res8: Double = 0.7651821862348178
scala> import org.apache.spark.ml.PipelineModel import org.apache.spark.ml.PipelineModel
scala> import org.apache.spark.ml.classification.LogisticRegressionModel import org.apache.spark.ml.classification.LogisticRegressionModel
scala> val trainedPipeline = tvsFitted.bestModel.asInstanceOf[PipelineModel] trainedPipeline: org.apache.spark.ml.PipelineModel = pipeline_ef9fc1ede161
scala> val TrainedLR = trainedPipeline.stages(1).asInstanceOf[LogisticRegressionModel] TrainedLR: org.apache.spark.ml.classification.LogisticRegressionModel = LogisticRegressionModel: uid=logreg_c929337cd6f4, numClasses=2, numFeatures=7
scala> val summaryLR = TrainedLR.summary summaryLR: org.apache.spark.ml.classification.LogisticRegressionTrainingSummary = @.***
scala> summaryLR.objectiveHistory // 0.6751425885789243, 0.5543659647777687, 0.473776... res9: Array[Double] = Array(0.6719583059278968, 0.6475031325504873, 0.5887448750830291, 0.5817096472502368, 0.5800299510636623, 0.5790220897159984, 0.5787477711186043, 0.5786388131062691, 0.5784587857917531, 0.5780775104907241, 0.577918539244777, 0.5779135596254581, 0.5778821561258602, 0.5778810892221772, 0.5778795041325092, 0.5778776761948066, 0.5778775477073494, 0.5778775130411241, 0.577877496470387, 0.5778774811957399, 0.577877466886602, 0.5778774517581886, 0.5778774421090169, 0.5778774410762949, 0.5778774408394689, 0.5778774391599338, 0.5778774386511226, 0.5778774385861873, 0.577877438138773, 0.577877438073588, 0.5778774379134698, 0.5778774377171001, 0.5778774376810277, 0.5778774376302049, 0.577877437624541)
scala> // Persisting and Applying Models:
scala> tvsFitted.write.overwrite().save("/tmp/modelLocation")
scala> // p 424
scala> import org.apache.spark.ml.tuning.TrainValidationSplitModel import org.apache.spark.ml.tuning.TrainValidationSplitModel
scala> val model = TrainValidationSplitModel.load("/tmp/modelLocation") model: org.apache.spark.ml.tuning.TrainValidationSplitModel = TrainValidationSplitModel: uid=tvs_ea9c544d061a, bestModel=pipeline_ef9fc1ede161, trainRatio=0.75
scala> model.transform(test) res11: org.apache.spark.sql.DataFrame = [color: string, lab: string ... 7 more fields]
scala>