SynapseML icon indicating copy to clipboard operation
SynapseML copied to clipboard

[LightGBM] Connected denied error when training LightGBMClassifier

Open wilsonNg007 opened this issue 2 years ago • 6 comments

SynapseML version

0.10.2

System information

  • Language version python 3.10
  • Spark Version 3.3
  • Spark Platform Synapse

Describe the problem

I'm starting with example codes from https://microsoft.github.io/SynapseML/docs/features/lightgbm/LightGBM%20-%20Overview. And it worked with my dataset. The training dataset size is ~5 million rows X 70 features.

After adding the OneHotEncoder (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.OneHotEncoder.html) to transform some categorical-string columns, we started seeing the connected denied error.

Here is the transformed output example:

InterfaceIpState_onehot {"type":0,"size":13,"indices":[0],"values":[1]} {"type":0,"size":13,"indices":[2],"values":[1]} {"type":0,"size":13,"indices":[3],"values":[1]} {"type":0,"size":1,"indices":[],"values":[]}

I saw similar issue #963 , so I added the w/a that was mentioned i.e. useBarrierExecutionMode=True. But still getting the same error. Not sure how that would lead to connection denied error.

Py4JJavaError Traceback (most recent call last) Cell In [35], line 10 4 useBarrierExecutionMode=True 6 model = LightGBMClassifier( 7 objective="multiclass", featuresCol="features", labelCol="DiagnoseType_index", isUnbalance=True 8 ) ---> 10 model = model.fit(train_data)

File /opt/spark/python/lib/pyspark.zip/pyspark/ml/base.py:205, in Estimator.fit(self, dataset, params) 203 return self.copy(params)._fit(dataset) 204 else: --> 205 return self._fit(dataset) 206 else: 207 raise TypeError( 208 "Params must be either a param map or a list/tuple of param maps, " 209 "but got %s." % type(params) 210 )

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/synapse/ml/lightgbm/LightGBMClassifier.py:2017, in LightGBMClassifier._fit(self, dataset) 2016 def _fit(self, dataset): -> 2017 java_model = self._fit_java(dataset) 2018 return self._create_model(java_model)

File /opt/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py:380, in JavaEstimator._fit_java(self, dataset) 377 assert self._java_obj is not None 379 self._transfer_params_to_java() --> 380 return self._java_obj.fit(dataset._jdf)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.call(self, *args) 1315 command = proto.CALL_COMMAND_NAME +
1316 self.command_header +
1317 args_command +
1318 proto.END_COMMAND_PART 1320 answer = self.gateway_client.send_command(command) -> 1321 return_value = get_return_value( 1322 answer, self.gateway_client, self.target_id, self.name) 1324 for temp_arg in temp_args: 1325 temp_arg._detach()

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in capture_sql_exception..deco(*a, **kw) 188 def deco(*a: Any, **kw: Any) -> Any: 189 try: --> 190 return f(*a, **kw) 191 except Py4JJavaError as e: 192 converted = convert_exception(e.java_exception)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o4447.fit. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 260.0 failed 4 times, most recent failure: Lost task 0.3 in stage 260.0 (TID 30932) (vm-ea099120 executor 3): java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:607) at java.net.Socket.connect(Socket.java:556) at java.net.Socket.(Socket.java:452) at java.net.Socket.(Socket.java:229) at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:129) at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:116) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29) at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:111) at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28) at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:107) at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197) at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:591) at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:897) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:897) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:366) at org.apache.spark.rdd.RDD.iterator(RDD.scala:330) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:366) at org.apache.spark.rdd.RDD.iterator(RDD.scala:330) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:366) at org.apache.spark.rdd.RDD.iterator(RDD.scala:330) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2682) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2618) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2617) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2617) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1190) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1190) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1190) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2870) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2812) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2801) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:958) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2353) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2374) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2418) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1028) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:407) at org.apache.spark.rdd.RDD.collect(RDD.scala:1027) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:460) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3881) at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3133) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3871) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:562) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3869) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3869) at org.apache.spark.sql.Dataset.collect(Dataset.scala:3133) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executePartitionTasks(LightGBMBase.scala:599) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executePartitionTasks$(LightGBMBase.scala:584) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.executePartitionTasks(LightGBMClassifier.scala:27) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executeTraining(LightGBMBase.scala:574) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executeTraining$(LightGBMBase.scala:546) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.executeTraining(LightGBMClassifier.scala:27) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.trainOneDataBatch(LightGBMBase.scala:436) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.trainOneDataBatch$(LightGBMBase.scala:393) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.trainOneDataBatch(LightGBMClassifier.scala:27) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$train$2(LightGBMBase.scala:61) at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb(SynapseMLLogging.scala:87) at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb$(SynapseMLLogging.scala:84) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logVerb(LightGBMClassifier.scala:27) at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logTrain(SynapseMLLogging.scala:78) at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logTrain$(SynapseMLLogging.scala:77) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logTrain(LightGBMClassifier.scala:27) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train(LightGBMBase.scala:63) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train$(LightGBMBase.scala:35) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:27) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:27) at org.apache.spark.ml.Predictor.fit(Predictor.scala:151) at org.apache.spark.ml.Predictor.fit(Predictor.scala:115) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Caused by: java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:607) at java.net.Socket.connect(Socket.java:556) at java.net.Socket.(Socket.java:452) at java.net.Socket.(Socket.java:229) at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:129) at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:116) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29) at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29) at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:111) at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28) at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:107) at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197) at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132) at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:591) at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:897) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:897) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:366) at org.apache.spark.rdd.RDD.iterator(RDD.scala:330) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:366) at org.apache.spark.rdd.RDD.iterator(RDD.scala:330) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:366) at org.apache.spark.rdd.RDD.iterator(RDD.scala:330) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

Code to reproduce issue

from pyspark.ml.feature import StringIndexer, OneHotEncoder

categorical_cols = ["InIpState", "InPur", "PhyInState", "AdStat", "OpStatFl"] indexers = [StringIndexer(inputCol=col, outputCol=col + "_index").fit(df) for col in categorical_cols]

Apply the transformations

for indexer in indexers: df = indexer.transform(df)

create onehot encoder

encoder = OneHotEncoder(inputCols=[col + "_index" for col in categorical_cols], outputCols=[col + "_onehot" for col in categorical_cols])

Fit the encoder on the DataFrame and transform it

encoded_df = encoder.fit(df).transform(df)


for col in categorical_cols: encoded_df = encoded_df.drop(col) # Drop the original categorical column

outputCols=[col + "_index" for col in categorical_cols] for col in outputCols: encoded_df = encoded_df.drop(col) # Drop the *_index columns


from pyspark.ml.feature import VectorAssembler

df = encoded_df train, test = df.randomSplit([0.85, 0.15], seed=1)


feature_cols = df.columns[:-1] label_col = "DiagnoseType_index" featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features") train_data = featurizer.transform(train)[label_col, "features"] test_data = featurizer.transform(test)[label_col, "features"]


from synapse.ml.lightgbm import LightGBMClassifier

useBarrierExecutionMode=True model = LightGBMClassifier( objective="multiclass", featuresCol="features", labelCol="DiagnoseType_index", isUnbalance=True )

model = model.fit(train_data)

Other info / logs

No response

What component(s) does this bug affect?

  • [ ] area/cognitive: Cognitive project
  • [ ] area/core: Core project
  • [ ] area/deep-learning: DeepLearning project
  • [X] area/lightgbm: Lightgbm project
  • [ ] area/opencv: Opencv project
  • [ ] area/vw: VW project
  • [ ] area/website: Website
  • [ ] area/build: Project build system
  • [ ] area/notebooks: Samples under notebooks folder
  • [ ] area/docker: Docker usage
  • [ ] area/models: models related issue

What language(s) does this bug affect?

  • [ ] language/scala: Scala source code
  • [X] language/python: Pyspark APIs
  • [ ] language/r: R APIs
  • [ ] language/csharp: .NET APIs
  • [ ] language/new: Proposals for new client languages

What integration(s) does this bug affect?

  • [ ] integrations/synapse: Azure Synapse integrations
  • [ ] integrations/azureml: Azure ML integrations
  • [ ] integrations/databricks: Databricks integrations

wilsonNg007 avatar May 26 '23 20:05 wilsonNg007

Hey @wilsonNg007 :wave:! Thank you so much for reporting the issue/feature request :rotating_light:. Someone from SynapseML Team will be looking to triage this issue soon. We appreciate your patience.

github-actions[bot] avatar May 26 '23 20:05 github-actions[bot]

Update:

I followed Jason Wang's suggestion to checkpoint the encoded dataframe, and use the checkpointed dataframe as input to the classifier fit function. Will update the result.

wilsonNg007 avatar May 26 '23 20:05 wilsonNg007

I believe @svotaw is working on looking into issues related to this

mhamilton723 avatar Jul 10 '23 17:07 mhamilton723

I'm not sure about ConnectionRefused. That's not the OOM issue that can be resolved by using streaming mode. @wilsonNg007 if you'd like to try our newest release, let me or @mhamilton723 know. It handles data in a different way that is more efficient.

svotaw avatar Jul 10 '23 17:07 svotaw

We have released 11.2, which has the final streaming features completed.

svotaw avatar Jul 17 '23 19:07 svotaw

This problem is there in latest version also - #2051

vishalovercome avatar Aug 19 '23 19:08 vishalovercome