SynapseML
SynapseML copied to clipboard
[BUG] OutOfMemorySparkException only when including a validationIndicatorCol - LightGBMClassifier
SynapseML version
OutOfMemorySparkException only when including a validationIndicatorCol - LightGBMClassifier
System information
- Language version ( python 3.10.12, scala 2.12):
- Spark Version ( 3.4.1):
- Spark Platform (Databricks RunTime 13.3 LTS)
Describe the problem
I have a dataset written to S3 which was created using pyspark.ml's OneHotEncoder and VectorAssembler. So the dataset written to S3 has 3 columns:
- Features: a sparse representation of my original features. Features were originally a mix of numerical and categorical, the categoricals got OneHotEncoded
- Target column (binary)
- EvalFlag (boolean)
When I don't set a validationIndicatorCol in the classifier's constructor, training succeeds. However, as soon as I set validationIndicatorCol='EvalFlag' in the constructor, I'm getting the error #org.apache.spark.sql.execution.OutOfMemorySparkException: Total memory usage during row decode exceeds spark.driver.maxResultSize (60.0 GiB). The average row size was 626.0 B, with 29.0 GiB used for temporary buffers. [shuffleId: None]
Note that I'd already increased spark.driver.MaxResultSize from whatever its default is, to 60gb. Even when it was its default value (which I gather would be much smaller), training without the evaluation flag worked just fine.
So something about including an evaluation set has massively increased the requirements on spark.driver.MaxResultSize
Code to reproduce issue
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, VectorAssembler
from pyspark.sql import functions as F
from synapse.ml.lightgbm import LightGBMClassifier
df_raw= spark.read.parquet("s3path.pq")
categorical_columns = ["x", "y", "z", ..]
dense_columns = ['a', 'b', 'c', ...]
#df_raw on disk has `categorical_columns`, `dense_columns` and `'target'` as columns
ohe_stage = [OneHotEncoder(inputCol=col, outputCol=f"{col}_ohe") for col in categorical_columns]
ohe_output_columns = [f"{col}_ohe" for col in categorical_columns]
assembler = VectorAssembler(
inputCols=dense_columns + ohe_output_columns,
outputCol="features",
handleInvalid='keep'
)
pipeline = Pipeline(stages=ohe_stage + [assembler])
pipeline_model = pipeline.fit(df_raw)
df_transformed = (pipeline_model.transform(df_raw)
.withColumn("EvalFlag", F.rand())
.withColumn("EvalFlag", F.col("EvalFlag")<0.1)
)
#optionally either write df_transformed to s3 to materialize the transformation before training, and then read back in, or just pass df_transformed to training, doesn't change the outcome
#df_transformed.write.parquet('s3pathdftransformed.pq')
#df_transformed = spark.read.parquet('s3pathdftransformed.pq')
lightgbm = LightGBMClassifier(
featuresCol="features",
labelCol="target",
categoricalSlotNames=categorical_columns,
maxDepth=10,
numLeaves=200,
learningRate=0.1,
numIterations=500,
earlyStoppingRound=10,
validationIndicatorCol='EvalFlag', #toggle this line on and off
verbosity=1
)
lightgbm_model = lightgbm.fit(df_transformed)
Other info / logs
No response
What component(s) does this bug affect?
- [ ]
area/cognitive: Cognitive project - [ ]
area/core: Core project - [ ]
area/deep-learning: DeepLearning project - [x]
area/lightgbm: Lightgbm project - [ ]
area/opencv: Opencv project - [ ]
area/vw: VW project - [ ]
area/website: Website - [ ]
area/build: Project build system - [ ]
area/notebooks: Samples under notebooks folder - [ ]
area/docker: Docker usage - [ ]
area/models: models related issue
What language(s) does this bug affect?
- [ ]
language/scala: Scala source code - [ ]
language/python: Pyspark APIs - [ ]
language/r: R APIs - [ ]
language/csharp: .NET APIs - [ ]
language/new: Proposals for new client languages
What integration(s) does this bug affect?
- [ ]
integrations/synapse: Azure Synapse integrations - [ ]
integrations/azureml: Azure ML integrations - [ ]
integrations/databricks: Databricks integrations
Facing the same issue
This may be because the validation dataset is loaded in memory on every executor. Therefore, a large validation dataset may cause out of memory errors
yes, I think that's almost certainly right. I would regard this as a bug...or at least a very significant drawback warranting a feature request for it to not work this way (see for example dask xgboost). If you want to reap the benefits of training on large data, you're going to want to have validation sets that scale somewhat.
From the source code perspective, validateData is broadcasted to each executor without any compression processing, which leads to a significant consumption of memory. Therefore, currently, the only way to avoid this problem is to reduce the size of validateData to prevent it from occupying too much memory.
This may be because the LightGBM C++ library does not support streaming validation datasets, but does support streaming training datasets. Therefore, I think the room for improvement in SynapseML is limited. At most, it can change the way the validation data is loaded from a broadcast method to another method that saves more space, and immediately release it after loading. However, it always has to fully load the validation dataset with the LightGBM C++ library, so the optimization space should be limited.
@mhamilton723 Hi Mark, could you or your team member could solve this issue? I also found that when adding the earlyStoppinground + validatoinCol, it does not work so well. The performance even worse than only train based on the training data only.