sparkling-water icon indicating copy to clipboard operation
sparkling-water copied to clipboard

Errors for fitting sparkling water GLM

Open cliu-sift opened this issue 2 years ago • 11 comments

For background info, see https://github.com/h2oai/sparkling-water/issues/2811

I found different errors for fitting logistic regression using H2OGLM in Sparkling water. It might also because of the high cardinality categorical features as the code runs fine with smaller data with less cardinality features.

The code I use:

from pysparkling.ml import H2OGLM
estimator = H2OGLM(
family="binomial",
featuresCols=predictors,
labelCol=response,
convertUnknownCategoricalLevelsToNa=True,
lambdaValue=[0]
)
pipeline = Pipeline(stages=[estimator])
# Fit and export the pipeline
model = pipeline.fit(df_train_customer_new)

The error:

Py4JJavaError                             Traceback (most recent call last)
Input In [21], in <cell line: 5>()
     25     pipeline = Pipeline(stages=[estimator])
     26     # Fit and export the pipeline
---> 27     model = pipeline.fit(df_train_customer_new)
     28     model.write().overwrite().save(base_opath+'sparkling_raw_feature'+cid+'_lr_customer_lambda0.model')
     29 df_prediction = model.transform(df_test_customer_new).select(['label', F.col('detailed_prediction')['probabilities']['1.0'].alias('prob')])

File /usr/lib/spark/python/pyspark/ml/base.py:161, in Estimator.fit(self, dataset, params)
    159         return self.copy(params)._fit(dataset)
    160     else:
--> 161         return self._fit(dataset)
    162 else:
    163     raise ValueError("Params must be either a param map or a list/tuple of param maps, "
    164                      "but got %s." % type(params))

File /usr/lib/spark/python/pyspark/ml/pipeline.py:114, in Pipeline._fit(self, dataset)
    112     dataset = stage.transform(dataset)
    113 else:  # must be an Estimator
--> 114     model = stage.fit(dataset)
    115     transformers.append(model)
    116     if i < indexOfLastEstimator:

File /usr/lib/spark/python/pyspark/ml/base.py:161, in Estimator.fit(self, dataset, params)
    159         return self.copy(params)._fit(dataset)
    160     else:
--> 161         return self._fit(dataset)
    162 else:
    163     raise ValueError("Params must be either a param map or a list/tuple of param maps, "
    164                      "but got %s." % type(params))

File /usr/lib/spark/python/pyspark/ml/wrapper.py:335, in JavaEstimator._fit(self, dataset)
    334 def _fit(self, dataset):
--> 335     java_model = self._fit_java(dataset)
    336     model = self._create_model(java_model)
    337     return self._copyValues(model)

File /usr/lib/spark/python/pyspark/ml/wrapper.py:332, in JavaEstimator._fit_java(self, dataset)
    318 """
    319 Fits a Java model to the input dataset.
    320 
   (...)
    329     fitted Java model
    330 """
    331 self._transfer_params_to_java()
--> 332 return self._java_obj.fit(dataset._jdf)

File /opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py:1304, in JavaMember.__call__(self, *args)
   1298 command = proto.CALL_COMMAND_NAME +\
   1299     self.command_header +\
   1300     args_command +\
   1301     proto.END_COMMAND_PART
   1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
   1305     answer, self.gateway_client, self.target_id, self.name)
   1307 for temp_arg in temp_args:
   1308     temp_arg._detach()

File /usr/lib/spark/python/pyspark/sql/utils.py:111, in capture_sql_exception.<locals>.deco(*a, **kw)
    109 def deco(*a, **kw):
    110     try:
--> 111         return f(*a, **kw)
    112     except py4j.protocol.Py4JJavaError as e:
    113         converted = convert_exception(e.java_exception)

File /opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o34555.fit.
: java.io.IOException: Premature EOF
	at sun.net.www.http.ChunkedInputStream.readAheadBlocking(ChunkedInputStream.java:565)
	at sun.net.www.http.ChunkedInputStream.readAhead(ChunkedInputStream.java:609)
	at sun.net.www.http.ChunkedInputStream.read(ChunkedInputStream.java:696)
	at java.io.FilterInputStream.read(FilterInputStream.java:133)
	at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3456)
	at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3449)
	at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:2146)
	at org.apache.commons.io.IOUtils.copy(IOUtils.java:2102)
	at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:2123)
	at org.apache.commons.io.IOUtils.copy(IOUtils.java:2078)
	at ai.h2o.sparkling.backend.utils.RestCommunication.$anonfun$downloadBinaryURLContent$2(RestCommunication.scala:226)
	at ai.h2o.sparkling.backend.utils.RestCommunication.$anonfun$downloadBinaryURLContent$2$adapted(RestCommunication.scala:225)
	at ai.h2o.sparkling.utils.ScalaUtils$.withResource(ScalaUtils.scala:28)
	at ai.h2o.sparkling.backend.utils.RestCommunication.$anonfun$downloadBinaryURLContent$1(RestCommunication.scala:225)
	at ai.h2o.sparkling.backend.utils.RestCommunication.$anonfun$downloadBinaryURLContent$1$adapted(RestCommunication.scala:224)
	at ai.h2o.sparkling.utils.ScalaUtils$.withResource(ScalaUtils.scala:28)
	at ai.h2o.sparkling.backend.utils.RestCommunication.downloadBinaryURLContent(RestCommunication.scala:224)
	at ai.h2o.sparkling.backend.utils.RestCommunication.downloadBinaryURLContent$(RestCommunication.scala:223)
	at ai.h2o.sparkling.ml.internals.H2OModel.downloadBinaryURLContent(H2OModel.scala:31)
	at ai.h2o.sparkling.ml.internals.H2OModel.downloadMojo(H2OModel.scala:38)
	at ai.h2o.sparkling.ml.internals.H2OModel.toMOJOModel(H2OModel.scala:96)
	at ai.h2o.sparkling.ml.algos.H2OEstimator.trainH2OModel(H2OEstimator.scala:59)
	at ai.h2o.sparkling.ml.algos.H2OEstimator.fit(H2OEstimator.scala:36)
	at ai.h2o.sparkling.ml.algos.H2OAlgorithm.fit(H2OAlgorithm.scala:41)
	at ai.h2o.sparkling.ml.algos.H2OSupervisedAlgorithm.fit(H2OSupervisedAlgorithm.scala:57)
	at ai.h2o.sparkling.ml.algos.H2OGLM.fit(H2OGLM.scala:35)
	at ai.h2o.sparkling.ml.algos.H2OGLM.fit(H2OGLM.scala:27)
	at sun.reflect.GeneratedMethodAccessor274.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)

cliu-sift avatar Aug 23 '22 07:08 cliu-sift

Hi @cliu-sift, could you share the full yarn logs?

mn-mikke avatar Aug 23 '22 14:08 mn-mikke

@mn-mikke Here is the full yarn log cliu_h2o_yarn_logs3.log I also found the java.lang.OutOfMemoryError: Requested array size exceeds VM limit in the log: image

cliu-sift avatar Aug 24 '22 19:08 cliu-sift

It fails on creation of MOJO model in H2O-3, when serializing it's meta-data. I can not understand why the core meta data of the model became so big. Would it possible for you to share a dataset and the code snippet that trained the model so I could reproduce it on my side?

mn-mikke avatar Aug 25 '22 18:08 mn-mikke

Sure, I will create a data for you for you to reproduce the error. Will give it to you today! Really appreciate your help!

cliu-sift avatar Aug 25 '22 18:08 cliu-sift

Attached are the csv files of training and testing dataset. Given it's in csv, you need to read it into a spark dataframe by either something like

df_train_customer_fake = spark.read.csv("{path_to_the_csv_file}")
df_test_customer_fake = spark.read.csv("{path_to_the_csv_file}")

or something like

train_frame = h2o.import_file("{path_to_csv}")
df_train_customer_fake = hc.asSparkFrame(train_frame)
test_frame = h2o.import_file("{path_to_csv}")
df_test_customer_fake = hc.asSparkFrame(test_frame)

I read the data from parquet, so I haven't tried either of the above way, but it should work I suppose.

The training and testing data contains 776 features in total, 483 of them are categorical (although they are actually random numbers I generated, but they should be shown as string format), the rest are numerical. The response column name is "label".

Once you successfully loaded the data, you can try to run the following code

from pysparkling.ml import H2OGLM
from pyspark.ml import Pipeline
from pysparkling import *
from pyspark.sql import SparkSession
import h2o
response = 'label'

df_train_customer_fake = df_train_customer_fake.withColumn("label", df_train_customer_fake.label.cast("string"))
df_test_customer_fake = df_test_customer_fake.withColumn("label", df_test_customer_fake.label.cast("string"))
predictors = list(df_train_customer_fake.columns)
predictors.remove('label') # except for the label columns, the rest columns should all be predictors

estimator = H2OGLM(
family="binomial",
featuresCols=predictors,
labelCol=response,
convertUnknownCategoricalLevelsToNa=True,
lambdaValue=[0]
) 
pipeline = Pipeline(stages=[estimator])
# Fit and export the pipeline
model = pipeline.fit(df_train_customer_fake)
df_prediction = model.transform(df_test_customer_fake).select(['label', F.col('detailed_prediction')['probabilities']['1.0'].alias('prob')])
pdf_prediction = df_prediction.toPandas()

That should reproduce the error

cliu-sift avatar Aug 26 '22 05:08 cliu-sift

Sorry the file is too large to attach, this link should point you to the csv files of training and testing dataset @mn-mikke

cliu-sift avatar Aug 26 '22 07:08 cliu-sift

Hi @cliu-sift, I'm struggling to replicate the problem on the dataset that you shared. Here is the script that I'm running:

from pysparkling.ml import H2OGLM
from pyspark.ml import Pipeline
from pysparkling import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import h2o
response = 'label'
spark=SparkSession.builder.master("local[*]").config("spark.driver.memory", "20g").getOrCreate()
hc=H2OContext.getOrCreate()

df_train_customer_fake = spark.read.option("header", "true").option("inferSchema", "true").csv("file:///Users/marek/pdf_train_customer_fake.csv")
df_test_customer_fake = spark.read.option("header", "true").option("inferSchema", "true").csv("file:///Users/marek/pdf_test_customer_fake.csv")

df_train_customer_fake = df_train_customer_fake.withColumn("label", df_train_customer_fake.label.cast("string"))
df_test_customer_fake = df_test_customer_fake.withColumn("label", df_test_customer_fake.label.cast("string"))
predictors = list(df_train_customer_fake.columns)
for predictor in predictors:
    if predictor.startswith("categorical"):
        df_train_customer_fake=df_train_customer_fake.withColumn(predictor, df_train_customer_fake[predictor].cast("string"))
        df_test_customer_fake=df_test_customer_fake.withColumn(predictor, df_test_customer_fake[predictor].cast("string"))
        
predictors.remove('label') # except for the label columns, the rest columns should all be predictors


estimator = H2OGLM(
family="binomial",
featuresCols=predictors,
labelCol=response,
convertUnknownCategoricalLevelsToNa=True,
lambdaValue=[0]
) 

model=estimator.fit(df_train_customer_fake)
model.transform(df_test_customer_fake).show()

mn-mikke avatar Sep 02 '22 13:09 mn-mikke

Hi @cliu-sift, any hints what could be doing wrong in reproducing the problem?

mn-mikke avatar Sep 06 '22 16:09 mn-mikke

Hi @mn-mikke, sorry for the late reply. Just back from the holiday. I've tested again, and I think I found out when the error will be triggered. For the fake data, I replace the original column name with the new column name like categorical_col_xxx and numerical_col_xxx last time. It runs fine at this scenario. But if I change the column name back to the original name (except for adding a suffix "fake", the one ending with "s_fake" is string column, the one ending with "d_fake" is numerical column), it will trigger the error message. Thus, I linked the new files with the new fake column name with same length as my original column, and it triggers the error. The file name for training and testing end with fake_rename Please use the same method as you used before to take a try. Thank you!

cliu-sift avatar Sep 07 '22 04:09 cliu-sift

@cliu-sift I've reproduced the problem and created the PUBDEV-8838 ticket for H2O-3 team.

mn-mikke avatar Sep 14 '22 12:09 mn-mikke

Thank you!

cliu-sift avatar Sep 14 '22 20:09 cliu-sift

the issue can be tracked under the mentioned ticket, closing that one

krasinski avatar Dec 05 '22 17:12 krasinski