SynapseML
SynapseML copied to clipboard
[LightGBM] Connected denied error when training LightGBMClassifier
SynapseML version
0.10.2
System information
- Language version python 3.10
- Spark Version 3.3
- Spark Platform Synapse
Describe the problem
I'm starting with example codes from https://microsoft.github.io/SynapseML/docs/features/lightgbm/LightGBM%20-%20Overview. And it worked with my dataset. The training dataset size is ~5 million rows X 70 features.
After adding the OneHotEncoder (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.OneHotEncoder.html) to transform some categorical-string columns, we started seeing the connected denied error.
Here is the transformed output example:
InterfaceIpState_onehot {"type":0,"size":13,"indices":[0],"values":[1]} {"type":0,"size":13,"indices":[2],"values":[1]} {"type":0,"size":13,"indices":[3],"values":[1]} {"type":0,"size":1,"indices":[],"values":[]}
I saw similar issue #963 , so I added the w/a that was mentioned i.e. useBarrierExecutionMode=True. But still getting the same error. Not sure how that would lead to connection denied error.
Py4JJavaError Traceback (most recent call last) Cell In [35], line 10 4 useBarrierExecutionMode=True 6 model = LightGBMClassifier( 7 objective="multiclass", featuresCol="features", labelCol="DiagnoseType_index", isUnbalance=True 8 ) ---> 10 model = model.fit(train_data)
File /opt/spark/python/lib/pyspark.zip/pyspark/ml/base.py:205, in Estimator.fit(self, dataset, params) 203 return self.copy(params)._fit(dataset) 204 else: --> 205 return self._fit(dataset) 206 else: 207 raise TypeError( 208 "Params must be either a param map or a list/tuple of param maps, " 209 "but got %s." % type(params) 210 )
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/synapse/ml/lightgbm/LightGBMClassifier.py:2017, in LightGBMClassifier._fit(self, dataset) 2016 def _fit(self, dataset): -> 2017 java_model = self._fit_java(dataset) 2018 return self._create_model(java_model)
File /opt/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py:380, in JavaEstimator._fit_java(self, dataset) 377 assert self._java_obj is not None 379 self._transfer_params_to_java() --> 380 return self._java_obj.fit(dataset._jdf)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.call(self, *args)
1315 command = proto.CALL_COMMAND_NAME +
1316 self.command_header +
1317 args_command +
1318 proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325 temp_arg._detach()
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in capture_sql_exception.
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o4447.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 260.0 failed 4 times, most recent failure: Lost task 0.3 in stage 260.0 (TID 30932) (vm-ea099120 executor 3): java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at java.net.Socket.connect(Socket.java:556)
at java.net.Socket.
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2682)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2618)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2617)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2617)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1190)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1190)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1190)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2870)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2812)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2801)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:958)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2353)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2374)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2418)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1028)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1027)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:460)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3881)
at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3133)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3871)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:562)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3869)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3869)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:3133)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executePartitionTasks(LightGBMBase.scala:599)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executePartitionTasks$(LightGBMBase.scala:584)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.executePartitionTasks(LightGBMClassifier.scala:27)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executeTraining(LightGBMBase.scala:574)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executeTraining$(LightGBMBase.scala:546)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.executeTraining(LightGBMClassifier.scala:27)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.trainOneDataBatch(LightGBMBase.scala:436)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.trainOneDataBatch$(LightGBMBase.scala:393)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.trainOneDataBatch(LightGBMClassifier.scala:27)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$train$2(LightGBMBase.scala:61)
at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb(SynapseMLLogging.scala:87)
at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb$(SynapseMLLogging.scala:84)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logVerb(LightGBMClassifier.scala:27)
at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logTrain(SynapseMLLogging.scala:78)
at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logTrain$(SynapseMLLogging.scala:77)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logTrain(LightGBMClassifier.scala:27)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train(LightGBMBase.scala:63)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train$(LightGBMBase.scala:35)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:27)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:27)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:115)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at java.net.Socket.connect(Socket.java:556)
at java.net.Socket.
Code to reproduce issue
from pyspark.ml.feature import StringIndexer, OneHotEncoder
categorical_cols = ["InIpState", "InPur", "PhyInState", "AdStat", "OpStatFl"] indexers = [StringIndexer(inputCol=col, outputCol=col + "_index").fit(df) for col in categorical_cols]
Apply the transformations
for indexer in indexers: df = indexer.transform(df)
create onehot encoder
encoder = OneHotEncoder(inputCols=[col + "_index" for col in categorical_cols], outputCols=[col + "_onehot" for col in categorical_cols])
Fit the encoder on the DataFrame and transform it
encoded_df = encoder.fit(df).transform(df)
for col in categorical_cols: encoded_df = encoded_df.drop(col) # Drop the original categorical column
outputCols=[col + "_index" for col in categorical_cols] for col in outputCols: encoded_df = encoded_df.drop(col) # Drop the *_index columns
from pyspark.ml.feature import VectorAssembler
df = encoded_df train, test = df.randomSplit([0.85, 0.15], seed=1)
feature_cols = df.columns[:-1] label_col = "DiagnoseType_index" featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features") train_data = featurizer.transform(train)[label_col, "features"] test_data = featurizer.transform(test)[label_col, "features"]
from synapse.ml.lightgbm import LightGBMClassifier
useBarrierExecutionMode=True model = LightGBMClassifier( objective="multiclass", featuresCol="features", labelCol="DiagnoseType_index", isUnbalance=True )
model = model.fit(train_data)
Other info / logs
No response
What component(s) does this bug affect?
- [ ]
area/cognitive: Cognitive project - [ ]
area/core: Core project - [ ]
area/deep-learning: DeepLearning project - [X]
area/lightgbm: Lightgbm project - [ ]
area/opencv: Opencv project - [ ]
area/vw: VW project - [ ]
area/website: Website - [ ]
area/build: Project build system - [ ]
area/notebooks: Samples under notebooks folder - [ ]
area/docker: Docker usage - [ ]
area/models: models related issue
What language(s) does this bug affect?
- [ ]
language/scala: Scala source code - [X]
language/python: Pyspark APIs - [ ]
language/r: R APIs - [ ]
language/csharp: .NET APIs - [ ]
language/new: Proposals for new client languages
What integration(s) does this bug affect?
- [ ]
integrations/synapse: Azure Synapse integrations - [ ]
integrations/azureml: Azure ML integrations - [ ]
integrations/databricks: Databricks integrations
Hey @wilsonNg007 :wave:! Thank you so much for reporting the issue/feature request :rotating_light:. Someone from SynapseML Team will be looking to triage this issue soon. We appreciate your patience.
Update:
I followed Jason Wang's suggestion to checkpoint the encoded dataframe, and use the checkpointed dataframe as input to the classifier fit function. Will update the result.
I believe @svotaw is working on looking into issues related to this
I'm not sure about ConnectionRefused. That's not the OOM issue that can be resolved by using streaming mode. @wilsonNg007 if you'd like to try our newest release, let me or @mhamilton723 know. It handles data in a different way that is more efficient.
We have released 11.2, which has the final streaming features completed.
This problem is there in latest version also - #2051