SynapseML icon indicating copy to clipboard operation
SynapseML copied to clipboard

[BUG]LightGBM fit failure during for-loop iteration

Open bjm88620 opened this issue 1 year ago • 0 comments

SynapseML version

com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3

System information

  • Language version (e.g. python 3.8, scala 2.12): python 3.9
  • Spark Version (e.g. 3.2.3): 3.3.2
  • Spark Platform (e.g. Synapse, Databricks): Databricks

Describe the problem

The model fit will fail during a for-loop iteration as below code example, the general error message is java.net.ConnectException: Connection refused (Connection refused), in the failed job there is no detail info for the failure only having executor lost failure;

Detail error message is as below: Py4JJavaError: An error occurred while calling o57649.fit. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in stage 53057.0 failed 4 times, most recent failure: Lost task 17.3 in stage 53057.0 (TID 2649814) (10.99.12.16 executor 39): java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:613) at java.net.Socket.connect(Socket.java:561) at java.net.Socket.(Socket.java:457) at java.net.Socket.(Socket.java:234) at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:133) at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29) at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115) at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28) at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111) at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197) at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615) at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:228) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:929) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:929) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:370) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:370) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:179) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:142) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:126) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:142) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:97) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:904) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1740) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:907) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:761) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3470) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3392) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3381) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3381) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1490) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1490) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1490) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3681) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3619) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3607) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51) at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1239) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1227) at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2758) at org.apache.spark.sql.execution.collect.Collector.$anonfun$runSparkJobs$1(Collector.scala:297) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:293) at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:377) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:128) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:135) at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:122) at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:110) at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:92) at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:541) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:529) at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$2(ResultCacheManager.scala:547) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$withFinalPlanUpdate$1(AdaptiveSparkPlanExec.scala:634) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:632) at org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:547) at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:402) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:395) at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:289) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:506) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:503) at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3458) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4382) at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3425) at org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4373) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:841) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4371) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$9(SQLExecution.scala:258) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:448) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:203) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1073) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:131) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:398) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4371) at org.apache.spark.sql.Dataset.collect(Dataset.scala:3425) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executePartitionTasks(LightGBMBase.scala:623) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executeTraining(LightGBMBase.scala:598) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.trainOneDataBatch(LightGBMBase.scala:446) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$train$2(LightGBMBase.scala:62) at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb(SynapseMLLogging.scala:163) at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb$(SynapseMLLogging.scala:160) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMRegressor.logVerb(LightGBMRegressor.scala:39) at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logFit(SynapseMLLogging.scala:153) at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logFit$(SynapseMLLogging.scala:152) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMRegressor.logFit(LightGBMRegressor.scala:39) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train(LightGBMBase.scala:64) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train$(LightGBMBase.scala:36) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMRegressor.train(LightGBMRegressor.scala:39) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMRegressor.train(LightGBMRegressor.scala:39) at org.apache.spark.ml.Predictor.fit(Predictor.scala:151) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:306) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195) at py4j.ClientServerConnection.run(ClientServerConnection.java:115) at java.lang.Thread.run(Thread.java:750) Caused by: java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:613) at java.net.Socket.connect(Socket.java:561) at java.net.Socket.(Socket.java:457) at java.net.Socket.(Socket.java:234) at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:133) at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29) at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115) at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28) at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111) at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197) at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615) at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:228) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:929) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:929) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:370) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) at org.apache.spark.rdd.RDD.iterator(RDD.scala:370) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:179) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:142) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:126) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:142) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:97) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:904) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1740) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:907) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:761) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

Code to reproduce issue

train_merged_df = spark.read.parquet(*train_df_files)
actual_merged_df = spark.read.parquet(*actual_df_files)

for num in range(0,20):
    max_base_date = datetime.strptime(end_base_date, "%Y-%m-%d").date() - int(time_window) * timedelta(days=7 * num)
    tmp_train_df = train_merged_df.where(sf.col('base_date')<=max_base_date)
    tmp_actual_df = actual_merged_df.where(sf.col('base_date')<=max_base_date)
    tmp_forecaster = model.fit(tmp_train_df, tmp_actual_df)

Other info / logs

No response

What component(s) does this bug affect?

  • [ ] area/cognitive: Cognitive project
  • [ ] area/core: Core project
  • [ ] area/deep-learning: DeepLearning project
  • [X] area/lightgbm: Lightgbm project
  • [ ] area/opencv: Opencv project
  • [ ] area/vw: VW project
  • [ ] area/website: Website
  • [ ] area/build: Project build system
  • [ ] area/notebooks: Samples under notebooks folder
  • [ ] area/docker: Docker usage
  • [ ] area/models: models related issue

What language(s) does this bug affect?

  • [ ] language/scala: Scala source code
  • [X] language/python: Pyspark APIs
  • [ ] language/r: R APIs
  • [ ] language/csharp: .NET APIs
  • [ ] language/new: Proposals for new client languages

What integration(s) does this bug affect?

  • [ ] integrations/synapse: Azure Synapse integrations
  • [ ] integrations/azureml: Azure ML integrations
  • [ ] integrations/databricks: Databricks integrations

bjm88620 avatar Sep 14 '24 17:09 bjm88620