[BUG] lightgbm and CrossValidator are not compatible
SynapseML version
synapseml_2.12:1.0.8
System information
- Language version (python 3.8, scala 2.12):
- Spark Version (3.5.0):
- Spark Platform (Databricks):
Describe the problem
I tried to combine synpase.ml.lightgbm with CrossValidator and hyperopt on Databricks. There are some trials and issues: 1. tried hyperopt and lightgbm, failed after 43 iterations, and pop out an message:
86%|████████▌ | 43/50 [5:39:26<55:15, 473.64s/trial, best loss: 0.22394105399688446]
Py4JJavaError: An error occurred while calling o46084.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 64 in stage 1606.0 failed 4 times, most recent failure: Lost task 64.3 in stage 1606.0 (TID 80923) (10.189.98.69 executor 195): java.net.ConnectException: Connection refused (Connection refused)
2.tried lightgbm with hyperopt and CrossValidator there are not progress bar after using CrossValidator, if I limit the data size into 1000, I succeed if the data is limited into 1000, but failed to feed the whole data. Even in the 1000 rows of data, the speed is extremly slow and Databricks doesn't show progress bar, which means it may not utilize parallel computing. I tried batches, autoScaling, numTasks and dynamicAllocation, also barrier. But all of those don't work for me.
error message:
Py4JJavaError: An error occurred while calling o1009.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 69.0 failed 4 times, most recent failure: Lost task 0.3 in stage 69.0 (TID 1619) (10.189.96.80 executor 3): java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:613)
at java.net.Socket.connect(Socket.java:561)
at java.net.Socket.<init>(Socket.java:457)
at java.net.Socket.<init>(Socket.java:234)
at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:133)
at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:226)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:938)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:938)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:413)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:377)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:413)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:377)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:211)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:199)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:161)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:155)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:102)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$10(Executor.scala:1042)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:1045)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:932)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.$anonfun$failJobAndIndependentStages$1(DAGScheduler.scala:4018)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:4016)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3930)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3917)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3917)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1766)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1749)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1749)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:4277)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4179)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4165)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:55)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1412)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1400)
at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:3157)
at org.apache.spark.sql.execution.collect.Collector.$anonfun$runSparkJobs$1(Collector.scala:303)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:299)
at org.apache.spark.sql.execution.collect.Collector.$anonfun$collect$1(Collector.scala:384)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:381)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:122)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:131)
at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:94)
at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:90)
at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:78)
at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:555)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:546)
at org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:563)
at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:401)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:400)
at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:319)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:572)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:569)
at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3844)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4816)
at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3811)
at org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4807)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1177)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4805)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$10(SQLExecution.scala:462)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:800)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:334)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:205)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:737)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4805)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:3811)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executePartitionTasks(LightGBMBase.scala:623)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executeTraining(LightGBMBase.scala:598)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.trainOneDataBatch(LightGBMBase.scala:446)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$train$5(LightGBMBase.scala:56)
at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:198)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$train$2(LightGBMBase.scala:48)
at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb(SynapseMLLogging.scala:163)
at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb$(SynapseMLLogging.scala:160)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logVerb(LightGBMClassifier.scala:27)
at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logFit(SynapseMLLogging.scala:153)
at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logFit$(SynapseMLLogging.scala:152)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logFit(LightGBMClassifier.scala:27)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train(LightGBMBase.scala:64)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train$(LightGBMBase.scala:36)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:27)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:27)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:115)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
at py4j.Gateway.invoke(Gateway.java:306)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:613)
at java.net.Socket.connect(Socket.java:561)
at java.net.Socket.<init>(Socket.java:457)
at java.net.Socket.<init>(Socket.java:234)
at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:133)
at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:226)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:938)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:938)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:413)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:377)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:413)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:377)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:211)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:199)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:161)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:155)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:102)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$10(Executor.scala:1042)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:1045)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:932)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
File <command-3334781331052689>, line 39
25 evaluator = BinaryClassificationEvaluator(
26 labelCol="conversion_flag",
27 rawPredictionCol="rawPrediction",
28 metricName="areaUnderROC"
29 )
31 cv = CrossValidator(
32 estimator=lgbm,
33 estimatorParamMaps=paramGrid,
(...)
36 parallelism=8 # parallel
37 )
---> 39 cv_model = cv.fit(train_assembler)
40 best_model = cv_model.bestModel
41 predictions = best_model.transform(val_assembler)
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Code to reproduce issue
lgbm_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_assembler = lgbm_assembler.transform(train_data)
val_assembler = lgbm_assembler.transform(val_data)
lgbm = LightGBMClassifier(
objective="binary",
featuresCol="features",
labelCol="conversion_flag",
numIterations=600,
isUnbalance=True,
isProvideTrainingMetric=True,
metric="auc",
numTasks=4,
numBatches=5
)
# grid search
paramGrid = ParamGridBuilder() \
.addGrid(lgbm.numLeaves, [31, 63]) \
.addGrid(lgbm.maxDepth, [8, 10]) \
.addGrid(lgbm.learningRate, [0.1, 0.01]) \
.build()
evaluator = BinaryClassificationEvaluator(
labelCol="conversion_flag",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
cv = CrossValidator(
estimator=lgbm,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=5, # 5-fold CV
parallelism=4 # parallel
)
cv_model = cv.fit(train_assembler)
best_model = cv_model.bestModel
predictions = best_model.transform(val_assembler)
Other info / logs
No response
What component(s) does this bug affect?
- [ ]
area/cognitive: Cognitive project - [ ]
area/core: Core project - [ ]
area/deep-learning: DeepLearning project - [x]
area/lightgbm: Lightgbm project - [ ]
area/opencv: Opencv project - [ ]
area/vw: VW project - [ ]
area/website: Website - [ ]
area/build: Project build system - [x]
area/notebooks: Samples under notebooks folder - [ ]
area/docker: Docker usage - [x]
area/models: models related issue
What language(s) does this bug affect?
- [ ]
language/scala: Scala source code - [x]
language/python: Pyspark APIs - [ ]
language/r: R APIs - [ ]
language/csharp: .NET APIs - [ ]
language/new: Proposals for new client languages
What integration(s) does this bug affect?
- [ ]
integrations/synapse: Azure Synapse integrations - [ ]
integrations/azureml: Azure ML integrations - [x]
integrations/databricks: Databricks integrations
@mhamilton723 @memoryz @mhamilton723 @dylanw-oss @svotaw @imatiach-msft Hi Team, anyone have a chance to check it? I believe connection error related issue has been posted several times, but they all did not be fixed. Also, cross-validation is so important for training a model, only utilizing some parameters on same train/validation dataset will cause over-fitting issue.
Thanks again if you have time to assign this task and give me some suggestions. I represent those users have same issues to highly appreciate your time and effort!
by the way, the dataset is around 7 million, and 700 more features. The data type are numerical and VectorUDT() which is transferred from FeatureHash. All null value and error data is excluded, because I could run our lightgbmClassifier successfully.
from pyspark.sql.functions import lit
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
feature_cols = [c for c in train_data.columns if c != "conversion_flag"]
lgbm_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
lgbm = LightGBMClassifier(
objective="binary",
featuresCol="features",
labelCol="conversion_flag",
numIterations=500,
isUnbalance=True,
isProvideTrainingMetric=True,
metric="auc",
verbosity=1
)
# lgbm.setPassThroughArgs("print_every_n_iterations=5")
pipeline = Pipeline(stages=[lgbm_assembler, lgbm])
model = pipeline.fit(train_data)
predictions = model.transform(val_data)
evaluator_auc = BinaryClassificationEvaluator(labelCol="conversion_flag", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc_roc = evaluator_auc.evaluate(predictions)
evaluator_pr = BinaryClassificationEvaluator(labelCol="conversion_flag", rawPredictionCol="rawPrediction", metricName="areaUnderPR")
auc_pr = evaluator_pr.evaluate(predictions)
print(f"AUC-ROC : {auc_roc}")
print(f"AUC-PR : {auc_pr}")
Finally succeed by using enclosed structure, but have a low processing time and don't show progress bar on Databricks. Could anyone try to reproduce this bugs? I believe cross-validation is important to avoid over-fitting. Thank you so much if there is anyone could help me out.
# what is setExecutionMode = straming means?
# could this be changed into numBatches to save time?
# it seems it will take a long time to start databricks
# is this maintain the same case whne you try other pysaprk.ml models?
from pyspark.sql.functions import lit
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# vectorize features
hash_cols = [c for c in train_data.columns if c.startswith('hash_')]
feature_cols = [c for c in train_data.columns if c != "conversion_flag"]
# feature_cols = [c for c in train_data.columns if c != "conversion_flag"]
lgbm_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
lgbm = LightGBMClassifier(
objective="binary",
featuresCol="features",
labelCol="conversion_flag",
numIterations=600,
isUnbalance=True,
useBarrierExecutionMode=True,
dataTransferMode='streaming',
earlyStoppingRound=30
)
# grid search
paramGrid = ParamGridBuilder() \
.addGrid(lgbm.numLeaves, [31, 63]) \
.addGrid(lgbm.maxDepth, [8, 10]) \
.addGrid(lgbm.learningRate, [0.1, 0.01]) \
.build()
evaluator = BinaryClassificationEvaluator(
labelCol="conversion_flag",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
cv = CrossValidator(
estimator=lgbm,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=5, # 5-fold CV
parallelism=4
)
pipeline = Pipeline(stages=[lgbm_assembler, cv])
cv_model = pipeline.fit(train_data)