sagemaker-spark icon indicating copy to clipboard operation
sagemaker-spark copied to clipboard

XGBoostSageMakerEstimator.fit() returns libsvm exception when reading csv file.

Open haowang-ms89 opened this issue 6 years ago • 32 comments

I write my python code with Zeppelin 0.7.3 and Spark 2.3.0 on an EMR (emr-5.13.0) cluster to use SageMaker's XGBoost algorithm. The input data is a csv file. The first 3 lines of the file are (the first column is 0 or 1 for target class, and there is no header line): 0,9.6071,2,1,1,2,1,1,1,1,3,1,0,0,0,0,3,0,0,3,0,0,3,0,2,1,1,1 0,2.7296,3,1,1,1,1,1,0,0,8,1,0,0,0,0,3,0,0,3,0,0,3,0,1,1,1,1 0,10.3326,1,0,1,2,1,1,0,0,4,1,1,0,1,0,3,0,0,3,0,0,3,0,0,3,0,0

I imported as the example does: %pyspark from pyspark import SparkContext, SparkConf from sagemaker_pyspark import IAMRole, classpath_jars from sagemaker_pyspark.algorithms import XGBoostSageMakerEstimator

I initialize the estimator: %pyspark xgboost_estimator = XGBoostSageMakerEstimator( trainingInstanceType="ml.m3.xlarge", trainingInstanceCount=1, endpointInstanceType="ml.m3.xlarge", endpointInitialInstanceCount=1) xgboost_estimator.setObjective('multi:softprob') xgboost_estimator.setNumRound(25) xgboost_estimator.setNumClasses(2)

I read the csv file with: training_data = spark.read.csv("s3://poc.sagemaker.myfile/myfile.csv", sep=",", header="false", inferSchema="true")

training_data.show() gives: +---+-------+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+ |_c0| _c1|_c2|_c3|_c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26|_c27| +---+-------+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+ | 0| 7.1732| 1| 0| 1| 2| 2| 2| 0| 0| 5| 1| 1| 0| 1| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| | 0| 1.3087| 1| 0| 1| 2| 1| 1| 0| 0| 2| 1| 1| 0| 2| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| | 0| 3.3539| 1| 0| 1| 2| 2| 1| 0| 0| 6| 1| 1| 0| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| | 0| 1.9767| 1| 0| 1| 1| 1| 1| 1| 1| 73| 1| 0| 0| 1| 0| 3| 0| 0| 3| 0| 1| 0| 1| 1| 0| 1| 1| | 0| 5.7194| 1| 0| 1| 2| 1| 1| 0| 0| 3| 1| 0| 0| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| | 0| 9.8398| 3| 1| 1| 2| 1| 1| 0| 0| 2| 1| 1| 0| 1| 0| 3| 0| 0| 3| 0| 2| 1| 1| 2| 1| 1| 1| | 0| 2.4942| 1| 0| 1| 2| 1| 1| 0| 0| 377| 1| 1| 0| 2| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| | 0| 7.9179| 4| 1| 1| 2| 1| 1| 0| 0| 4| 1| 1| 0| 2| 0| 3| 0| 0| 3| 0| 2| 0| 1| 2| 1| 1| 1|

When I try to fit the xgboost model with: xgboost_model = xgboost_estimator.fit(training_data)

The following exception returns:
Traceback (most recent call last): File "/tmp/zeppelin_pyspark-8068283221541252178.py", line 367, in <module> raise Exception(traceback.format_exc()) Exception: Traceback (most recent call last): File "/tmp/zeppelin_pyspark-8068283221541252178.py", line 360, in <module> exec(code, _zcUserQueryNameSpace) File "<stdin>", line 1, in <module> File "/usr/local/lib/python2.7/site-packages/sagemaker_pyspark/SageMakerEstimator.py", line 253, in fit return self._call_java("fit", dataset) File "/usr/local/lib/python2.7/site-packages/sagemaker_pyspark/wrapper.py", line 76, in _call_java java_value = super(SageMakerJavaWrapper, self)._call_java(name, *java_args) File "/usr/lib/spark/python/pyspark/ml/wrapper.py", line 51, in _call_java return _java2py(sc, m(*java_args)) File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o130.fit. : java.io.IOException: Illegal schema for libsvm data, schema=StructType(StructField(_c0,IntegerType,true), StructField(_c1,DoubleType,true), StructField(_c2,IntegerType,true), StructField(_c3,IntegerType,true), StructField(_c4,IntegerType,true), StructField(_c5,IntegerType,true), StructField(_c6,IntegerType,true), StructField(_c7,IntegerType,true), StructField(_c8,IntegerType,true), StructField(_c9,IntegerType,true), StructField(_c10,IntegerType,true), StructField(_c11,IntegerType,true), StructField(_c12,IntegerType,true), StructField(_c13,IntegerType,true), StructField(_c14,IntegerType,true), StructField(_c15,IntegerType,true), StructField(_c16,IntegerType,true), StructField(_c17,IntegerType,true), StructField(_c18,IntegerType,true), StructField(_c19,IntegerType,true), StructField(_c20,IntegerType,true), StructField(_c21,IntegerType,true), StructField(_c22,IntegerType,true), StructField(_c23,IntegerType,true), StructField(_c24,IntegerType,true), StructField(_c25,IntegerType,true), StructField(_c26,IntegerType,true), StructField(_c27,IntegerType,true)) at org.apache.spark.ml.source.libsvm.LibSVMFileFormat.verifySchema(LibSVMRelation.scala:86) at org.apache.spark.ml.source.libsvm.LibSVMFileFormat.prepareWrite(LibSVMRelation.scala:122) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:140) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225) at com.amazonaws.services.sagemaker.sparksdk.internal.DataUploader.writeData(DataUploader.scala:111) at com.amazonaws.services.sagemaker.sparksdk.internal.DataUploader.uploadData(DataUploader.scala:90) at com.amazonaws.services.sagemaker.sparksdk.SageMakerEstimator.fit(SageMakerEstimator.scala:299) at com.amazonaws.services.sagemaker.sparksdk.SageMakerEstimator.fit(SageMakerEstimator.scala:175) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748)

Did I miss some steps so that the estimator use the libsvm libraries to process the csv input?

haowang-ms89 avatar May 02 '18 07:05 haowang-ms89

Hi, thanks for using SageMaker Spark!

XGBoostSageMakerEstimator uses Spark's LibSVMOutputWriter, which is rather restrictive in its schema validation: https://github.com/apache/spark/blob/930b90a84871e2504b57ed50efa7b8bb52d3ba44/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala#L79

I think the issue stems from the number of columns in your training data? There was some discussion of extra columns in #12 - not sure if anything in that issue might be relevant here.

laurenyu avatar May 02 '18 23:05 laurenyu

Thanks for laurenyu's reply. I wonder whether XGBoostSageMakerEstimator use the verifySchema() provided here https://github.com/apache/spark/blob/930b90a84871e2504b57ed50efa7b8bb52d3ba44/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala#L79 even if the input is in csv format? The official guide https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html says that input can be libsvm or csv, and

For CSV training, the algorithm assumes that the target variable is in the first column and that the CSV does not have a header record.

So I think the data read in the csv way dose not need to fit the two column schema. In this example https://github.com/awslabs/amazon-sagemaker-examples/blob/master/introduction_to_applying_machine_learning/xgboost_customer_churn/xgboost_customer_churn.ipynb, the author also said that

Amazon SageMaker XGBoost can train on data in either a CSV or LibSVM format. For this example, we'll stick with CSV. It should: Have the predictor variable in the first column Not have a header row

I followed the example's steps to create the csv file it uses, the first 3 lines are: 0,106,0,274.4,120,198.6,82,160.8,62,6.0,3,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,1,0 0,28,0,187.8,94,248.6,86,208.8,124,10.6,5,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,1,0,1,0 1,148,0,279.3,104,201.6,87,280.8,99,7.9,2,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,1,0 Which looks quite like mine, also has more than 2 columns. However, this example uses sagemaker.estimator.Estimator (which is not available in my EMR+Zeppline environment) instead of sagemaker_pyspark.algorithms.XGBoostSageMakerEstimator. Doesn't XGBoostSageMakerEstimator support multi-column csv input yet? Or the sagemaker.s3_input() function used in the example somehow make the csv to be a 2-column structure?

haowang-ms89 avatar May 03 '18 04:05 haowang-ms89

@haowang-ms89

All SageMakerEstimators rely on Spark's DataFrame writers. The XGBoostSageMakerEstimator defaults to write data using "libsvm" format. Can you try passing in "csv" to "trainingSparkDataFormat" (or "com.databricks.spark.csv" if you're using spark-csv)?

https://github.com/aws/sagemaker-spark/blob/dabd13650df3c06b61f43692fe04f787f7b86a1c/sagemaker-spark-sdk/src/main/scala/com/amazonaws/services/sagemaker/sparksdk/algorithms/XGBoostSageMakerEstimator.scala#L491

andremoeller avatar May 03 '18 05:05 andremoeller

@andremoeller Thanks a lot! Pass trainingSparkDataFormat="csv" when initializing XGBoostSageMakerEstimator and the exception no longer appears. But now I face the same issue reported here (which you added a bug tag) https://github.com/aws/sagemaker-spark/issues/27. I am using EMR+Zeppline environment instead of SageMaker's native notebook. I will ask in that thread. I will close this thread after I confirm there is no related issue. Thanks again!

haowang-ms89 avatar May 03 '18 08:05 haowang-ms89

@haowang-ms89

Sure! I just commented on that issue.

You will also have to pass in Some("csv") for the trainingContentType, or XGBoost will think you're trying to give it LibSVM data.

https://github.com/aws/sagemaker-spark/blob/dabd13650df3c06b61f43692fe04f787f7b86a1c/sagemaker-spark-sdk/src/main/scala/com/amazonaws/services/sagemaker/sparksdk/algorithms/XGBoostSageMakerEstimator.scala#L488

https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html

For Training ContentType, valid inputs are libsvm or csv.

andremoeller avatar May 03 '18 10:05 andremoeller

@andremoeller After passing trainingSparkDataFormat="csv" and trainingContentType="csv", the model can be trained with the csv file. However, when trying to do inference, I removed the first column (the target value) from the csv file and serve it as the test data. I assume it is supposed to work since in this document https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html:

For CSV training, the algorithm assumes that the target variable is in the first column and that the CSV does not have a header record. For CSV inference, the algorithm assumes that CSV input does not have the label column.

So I think that the only difference between training and inferencing csv files is the first column. The question is that when I call: transformed_data = xgboost_model.transform(test_data) The following stacktrace appears: Traceback (most recent call last): File "/tmp/zeppelin_pyspark-8068283221541252178.py", line 367, in <module> raise Exception(traceback.format_exc()) Exception: Traceback (most recent call last): File "/tmp/zeppelin_pyspark-8068283221541252178.py", line 360, in <module> exec(code, _zcUserQueryNameSpace) File "<stdin>", line 1, in <module> File "/usr/local/lib/python2.7/site-packages/sagemaker_pyspark/SageMakerModel.py", line 408, in transform return self._call_java("transform", dataset) File "/usr/local/lib/python2.7/site-packages/sagemaker_pyspark/wrapper.py", line 76, in _call_java java_value = super(SageMakerJavaWrapper, self)._call_java(name, *java_args) File "/usr/lib/spark/python/pyspark/ml/wrapper.py", line 51, in _call_java return _java2py(sc, m(*java_args)) File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/lib/spark/python/pyspark/sql/utils.py", line 79, in deco raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace) IllegalArgumentException: u'Expecting schema with DoubleType column with name label and Vector column with name features. Got StructType(StructField(_c0,DoubleType,true), StructField(_c1,IntegerType,true), StructField(_c2,IntegerType,true), StructField(_c3,IntegerType,true), StructField(_c4,IntegerType,true), StructField(_c5,IntegerType,true), StructField(_c6,IntegerType,true), StructField(_c7,IntegerType,true), StructField(_c8,IntegerType,true), StructField(_c9,IntegerType,true), StructField(_c10,IntegerType,true), StructField(_c11,IntegerType,true), StructField(_c12,IntegerType,true), StructField(_c13,IntegerType,true), StructField(_c14,IntegerType,true), StructField(_c15,IntegerType,true), StructField(_c16,IntegerType,true), StructField(_c17,IntegerType,true), StructField(_c18,IntegerType,true), StructField(_c19,IntegerType,true), StructField(_c20,IntegerType,true), StructField(_c21,IntegerType,true), StructField(_c22,IntegerType,true), StructField(_c23,IntegerType,true), StructField(_c24,IntegerType,true), StructField(_c25,IntegerType,true), StructField(_c26,IntegerType,true))'

Seems that the tranform() function wants the "label" and "features" 2 column format. I tried to find some parameters to pass in the initializer like before but can't find one in the page you provided before. Thank you!

haowang-ms89 avatar May 04 '18 03:05 haowang-ms89

transform() is trying to convert your DataFrame to LibSVM for inference because the requestRowSerializer is set to be LibSVMRequestRowSerializer:

https://github.com/aws/sagemaker-spark/blob/81ac05625e86db577124d7c49d4cea7ec25d181f/sagemaker-spark-sdk/src/main/scala/com/amazonaws/services/sagemaker/sparksdk/algorithms/XGBoostSageMakerEstimator.scala#L479-L480

If you want to send CSV, you should use this UnlabeledCSVRequestRowSerializer and pass it in as the requestRowSerializer to XGBoostSageMakerEstimator:

https://github.com/aws/sagemaker-spark/blob/master/sagemaker-spark-sdk/src/main/scala/com/amazonaws/services/sagemaker/sparksdk/transformation/serializers/UnlabeledCSVRequestRowSerializer.scala

The UnlabeledCSVRequestRowSerializer serializes a column of type Vector to CSV. You can pass in what column name UnlabeledCSVRequestRowSerializer serializes, but it defaults to "features":

https://github.com/aws/sagemaker-spark/blob/81ac05625e86db577124d7c49d4cea7ec25d181f/sagemaker-spark-sdk/src/main/scala/com/amazonaws/services/sagemaker/sparksdk/transformation/serializers/UnlabeledCSVRequestRowSerializer.scala#L27-L28

Right now, your DataFrame doesn't have such a column for a features vector, but you can make one with a VectorAssembler. After making an XGBoost estimator with UnlabeledCSVRequestRowSerializer:

import org.apache.spark.ml.feature.VectorAssembler
val assembler = new VectorAssembler()
// converts all columns in df to an appended column named "features"
// that holds a Vector
assembler.setInputCols(df.columns).setOutputCol("features")
val dfWithFeatures = assembler.transform(df)

// `UnlabeledCSVRequestRowSerializer` transforms on "features" column
myXGBoostEstimator.transform(dfWithFeatures).show()

Feel free to reach out if you run into trouble or if this was unclear.

andremoeller avatar May 04 '18 05:05 andremoeller

@andremoeller Many thanks! I have a very basic question now. I initialize the estimator with: xgboost_estimator = XGBoostSageMakerEstimator( trainingInstanceType="ml.m4.xlarge", trainingInstanceCount=1, endpointInstanceType="ml.m4.xlarge", endpointInitialInstanceCount=1, trainingSparkDataFormat="csv", trainingContentType="csv", requestRowSerializer= UnlabeledCSVRequestRowSerializer(), sagemakerRole=IAMRole(iamRole)) And I got NameError: name 'UnlabeledCSVRequestRowSerializer' is not defined

Which python library should I import to have UnlabeledCSVRequestRowSerializer()?

haowang-ms89 avatar May 04 '18 06:05 haowang-ms89

@haowang-ms89 For PySpark, it's here: https://github.com/aws/sagemaker-spark/blob/81ac05625e86db577124d7c49d4cea7ec25d181f/sagemaker-pyspark-sdk/src/sagemaker_pyspark/transformation/serializers/serializers.py#L31-L40

andremoeller avatar May 04 '18 06:05 andremoeller

@andremoeller Thanks a lot! My current question is that the assembled "features" looks strange. Here are 2 records of the assembled data frame: | 3.3539| 1| 0| 1| 2| 2| 1| 0| 0| 6| 1| 1| 0| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0|(27,[0,1,3,4,5,6,...|

| 1.9767| 1| 0| 1| 1| 1| 1| 1| 1| 73| 1| 0| 0| 1| 0| 3| 0| 0| 3| 0| 1| 0| 1| 1| 0| 1| 1|[1.9767,1.0,0.0,1...|

The "features" column in line 1 contains 27, the number of features, but the feature vector is wrong (the feature vector of all vectors with the number 27 are [0,1,3,4,5,6,...]). The second line seems to be fine but not having the number of features. I also found that the value in the column right before the "features" column (the 27th column named _c26) seems to relate to which pattern would be in "features". In the first 50 records I checked, if the value in the 27th column is 0, the "features" column looks like line 1. If the value in column 27 is 1, the "features" column looks like line 2.

I use the Python version of assembler I found here https://spark.apache.org/docs/2.3.0/ml-features.html#vectorassembler: from pyspark.ml.linalg import Vectors from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler( inputCols=test_data.columns, outputCol="features") test_data_features = assembler.transform(test_data)

And test_data_features.show() gives: +-------+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+--------------------+ | _c0|_c1|_c2|_c3|_c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26| features| +-------+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+--------------------+ | 7.1732| 1| 0| 1| 2| 2| 2| 0| 0| 5| 1| 1| 0| 1| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0|(27,[0,1,3,4,5,6,...| | 1.3087| 1| 0| 1| 2| 1| 1| 0| 0| 2| 1| 1| 0| 2| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0|(27,[0,1,3,4,5,6,...| | 3.3539| 1| 0| 1| 2| 2| 1| 0| 0| 6| 1| 1| 0| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0|(27,[0,1,3,4,5,6,...| | 1.9767| 1| 0| 1| 1| 1| 1| 1| 1| 73| 1| 0| 0| 1| 0| 3| 0| 0| 3| 0| 1| 0| 1| 1| 0| 1| 1|[1.9767,1.0,0.0,1...|

haowang-ms89 avatar May 04 '18 08:05 haowang-ms89

@haowang-ms89

That's normal. The VectorAssembler sparsely encodes vectors if there are lots of zeros in the data to save memory. The rows with 27 are SparseVectors. The 27 is the size of the array, followed by an array of indices, followed by an array of values. The densely encoded rows just have more nonzero values.

I believe the UnlabeledCSVRequestRowSerializer handles Sparse vectors correctly (that is, fills in the zeros when serializing to CSV).

andremoeller avatar May 04 '18 09:05 andremoeller

@andremoeller Thanks for your patience. When I call: xgboost_model.transform(test_data_features.select('features')).show()

The following exception occurs: Traceback (most recent call last): File "/tmp/zeppelin_pyspark-8068283221541252178.py", line 367, in <module> raise Exception(traceback.format_exc()) Exception: Traceback (most recent call last): File "/tmp/zeppelin_pyspark-8068283221541252178.py", line 360, in <module> exec(code, _zcUserQueryNameSpace) File "<stdin>", line 1, in <module> File "/usr/local/lib/python2.7/site-packages/sagemaker_pyspark/SageMakerModel.py", line 408, in transform return self._call_java("transform", dataset) File "/usr/local/lib/python2.7/site-packages/sagemaker_pyspark/wrapper.py", line 76, in _call_java java_value = super(SageMakerJavaWrapper, self)._call_java(name, *java_args) File "/usr/lib/spark/python/pyspark/ml/wrapper.py", line 51, in _call_java return _java2py(sc, m(*java_args)) File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/lib/spark/python/pyspark/sql/utils.py", line 79, in deco raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace) IllegalArgumentException: u'Expecting schema with DoubleType column with name label and Vector column with name features. Got StructType(StructField(features,org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7,true))'

Does this mean that the transform() function take "label" column? But label should not be required when doing prediction?

haowang-ms89 avatar May 04 '18 09:05 haowang-ms89

That looks like it's still using the LibSVM serializer, not the UnlabeledCSVRequestRowSerializer. The LibSVM serializer validates the schema like this:

https://github.com/aws/sagemaker-spark/blob/81ac05625e86db577124d7c49d4cea7ec25d181f/sagemaker-spark-sdk/src/main/scala/com/amazonaws/services/sagemaker/sparksdk/transformation/serializers/SchemaValidators.scala#L28-L30

Did you set xgboost_model.requestRowSerializer = UnlabeledCSVRequestRowSerializer() before transforming?

andremoeller avatar May 04 '18 10:05 andremoeller

@andremoeller Thanks a lot! I forgot to train xgboost_model again after I added the UnlabeledCSVRequestRowSerializer() in the initializer. I am having this exception now when calling xgboost_model.transform(test_data_features.select('features')).show():

Traceback (most recent call last): File "/tmp/zeppelin_pyspark-8068283221541252178.py", line 367, in <module> raise Exception(traceback.format_exc()) Exception: Traceback (most recent call last): File "/tmp/zeppelin_pyspark-8068283221541252178.py", line 360, in <module> exec(code, _zcUserQueryNameSpace) File "<stdin>", line 1, in <module> File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 350, in show print(self._jdf.showString(n, 20, vertical)) File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o1330.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 43.0 failed 4 times, most recent failure: Lost task 0.3 in stage 43.0 (TID 46, ip-10-104-118-9.us-west-2.compute.internal, executor 27): java.lang.NumberFormatException: For input string: "[0.8876568078994751" at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.lang.Double.parseDouble(Double.java:538) at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:284) at scala.collection.immutable.StringOps.toDouble(StringOps.scala:29) at com.amazonaws.services.sagemaker.sparksdk.transformation.deserializers.XGBoostCSVRowDeserializer$$anonfun$deserializeResponse$1.apply(XGBoostCSVRowDeserializer.scala:41) at com.amazonaws.services.sagemaker.sparksdk.transformation.deserializers.XGBoostCSVRowDeserializer$$anonfun$deserializeResponse$1.apply(XGBoostCSVRowDeserializer.scala:41) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at com.amazonaws.services.sagemaker.sparksdk.transformation.deserializers.XGBoostCSVRowDeserializer.deserializeResponse(XGBoostCSVRowDeserializer.scala:41) at com.amazonaws.services.sagemaker.sparksdk.transformation.util.RequestBatchIterator.hasNext(RequestBatchIterator.scala:132) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1750) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1738) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1737) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1737) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:871) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:871) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:871) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1971) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1920) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1909) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:682) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484) at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252) at org.apache.spark.sql.Dataset.head(Dataset.scala:2484) at org.apache.spark.sql.Dataset.take(Dataset.scala:2698) at org.apache.spark.sql.Dataset.showString(Dataset.scala:254) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NumberFormatException: For input string: "[0.8876568078994751" at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.lang.Double.parseDouble(Double.java:538) at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:284) at scala.collection.immutable.StringOps.toDouble(StringOps.scala:29) at com.amazonaws.services.sagemaker.sparksdk.transformation.deserializers.XGBoostCSVRowDeserializer$$anonfun$deserializeResponse$1.apply(XGBoostCSVRowDeserializer.scala:41) at com.amazonaws.services.sagemaker.sparksdk.transformation.deserializers.XGBoostCSVRowDeserializer$$anonfun$deserializeResponse$1.apply(XGBoostCSVRowDeserializer.scala:41) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at com.amazonaws.services.sagemaker.sparksdk.transformation.deserializers.XGBoostCSVRowDeserializer.deserializeResponse(XGBoostCSVRowDeserializer.scala:41) at com.amazonaws.services.sagemaker.sparksdk.transformation.util.RequestBatchIterator.hasNext(RequestBatchIterator.scala:132) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

It looks like there is a parsing error? The bracket '[' should not stick with the number? java.lang.NumberFormatException: For input string: "[0.8876568078994751" By the way, this number 0.8876... is not part of my data. Would it be some number produced in calculation or parameter in the model?

haowang-ms89 avatar May 05 '18 03:05 haowang-ms89

Hi @haowang-ms89 ,

com.amazonaws.services.sagemaker.sparksdk.transformation.deserializers.XGBoostCSVRowDeserializer$$anonfun$deserializeResponse$1.apply(XGBoostCSVRowDeserializer.scala:41)

This line indicates that the XGBoost CSV deserializer is failing to deserialize the response from the XGBoost model. That number (0.88765...) is apparently one of the predictions from the model, but the response doesn't seem to be formatted correctly as CSV, which the XGBoost endpoint should respond with. My current suspicion is that the model, in some cases, returns a multidimensional array that isn't being flattened before being joined with commas. I don't think using the CSV serializer has anything to do with it -- I modified the XGBoost notebook on Notebook Instances to use the CSV serializer, and still got back the same predictions, as expected.

Would it be possible to send request body you're sending to help us reproduce? I believe you can find it in the endpoint logs for your endpoint, in CloudWatch. In the meantime, I'll reach out to the developers of the XGBoost SageMaker algorithm.

Thanks!

andremoeller avatar May 07 '18 19:05 andremoeller

Hi @andremoeller Our DevOps is helping me and we can't find the request body you mentioned. We find the SageMaker endpoint log here: CloudWatch > Log Groups > /aws/sagemaker/Endpoints/endpoint-6b8e172a3395-2018-05-05T02-46-01-833 > AllRequests/i-0eb94294080366ebe The log only contains: /opt/amazon/lib/python2.7/site-packages/sage_xgboost/serve.py:117: ParserWarning: Falling back to the 'python' engine because the 'c' engine does not support sep=None with delim_whitespace=False; you can avoid this warning by specifying engine='python'. test = pd.read_csv(f.name, sep=None, header=None) Thank you!

haowang-ms89 avatar May 08 '18 02:05 haowang-ms89

Hi @haowang-ms89 ,

Huh, it's possible that they don't log failed requests. Thank you for that warning, though. I'll update this issue when I hear back from them.

andremoeller avatar May 08 '18 23:05 andremoeller

Hi @andremoeller Will it be helpful if I provide my code and training/testing csv files?

haowang-ms89 avatar May 10 '18 10:05 haowang-ms89

@haowang-ms89 ,

Yes, it sure would. If you can post it, I'll try to reproduce the issue.

andremoeller avatar May 10 '18 18:05 andremoeller

@andremoeller Thanks for help! Here are the 2 csv files I use for training and testing (they are exactly the same except for the absent of label column in the testing file). I changed the file name to .txt due to the uploading constraint, simply change it back to .csv should be fine.

Here is the code I wrote: from pyspark import SparkContext, SparkConf import sagemaker_pyspark from pyspark.sql import SparkSession from sagemaker_pyspark import IAMRole, classpath_jars from sagemaker_pyspark.algorithms import XGBoostSageMakerEstimator from sagemaker_pyspark.transformation import serializers from pyspark.ml.linalg import Vectors from pyspark.ml.feature import VectorAssembler classpath = ":".join(sagemaker_pyspark.classpath_jars()) spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath).getOrCreate()

iamRole = "arn:aws:iam::000000000000:role/service-role/AmazonSageMaker-ExecutionRole-20180427T160487" xgboost_estimator = XGBoostSageMakerEstimator( trainingInstanceType="ml.m4.xlarge", trainingInstanceCount=1, endpointInstanceType="ml.m4.xlarge", endpointInitialInstanceCount=1, trainingSparkDataFormat="csv", trainingContentType="csv", requestRowSerializer= serializers.UnlabeledCSVRequestRowSerializer(), sagemakerRole=IAMRole(iamRole)) xgboost_estimator.setObjective('multi:softprob') xgboost_estimator.setNumRound(25) xgboost_estimator.setNumClasses(2)

training_data = spark.read.csv("s3a://poc.sagemaker.hao0427/productionData0207_noheader_allnumber.csv", sep=",", header="false", inferSchema="true") xgboost_model = xgboost_estimator.fit(training_data)

test_data = spark.read.csv("s3a://poc.sagemaker.hao0427/productionData0207_noheader_allnumber_notarget.csv", sep=",", header="false", inferSchema="true") assembler = VectorAssembler( inputCols=test_data.columns, outputCol="features") test_data_features = assembler.transform(test_data) xgboost_model.transform(test_data_features.select('features')).show()

productionData0207_noheader_allnumber.txt productionData0207_noheader_allnumber_notarget.txt

haowang-ms89 avatar May 11 '18 02:05 haowang-ms89

Hi @haowang-ms89 ,

Thanks! I could reproduce this. I've contacted the XGBoost developers and asked them to take a look at what's going wrong.

andremoeller avatar May 14 '18 19:05 andremoeller

Hi @haowang-ms89,

Thanks for sharing the details.

Is the issue here that you cannot get the predictions results on hosting?

yijiezh avatar May 14 '18 20:05 yijiezh

Hi @EvanzzzZ Yes, the exception appears after I call xgboost_model.transform(test_data_features.select('features')).show() Thanks for your help.

haowang-ms89 avatar May 15 '18 01:05 haowang-ms89

@haowang-ms89 ,

There's a bug in the XGBoost container with the multi:softprob objective. Multi-dimensional arrays aren't being serialized from the hosting container back to the client in the expected format. The XGBoost team is working on a fix, but I can't give you an ETA.

Other objectives (those that return a scalar per record rather than a vector) still work as expected. You could also call InvokeEndpoint directly using the AWS Java SDK or boto3 client (or another AWS client) for SageMaker Runtime.

Please let us know if you have any other questions.

Thanks!

andremoeller avatar May 17 '18 18:05 andremoeller

Hi @andremoeller Thanks! I guess since the target to be predicted now is either 1 or 0, maybe I can try a single return value objective as the probability of being 1. I found the following choices in the source code ("reg:linear", "reg:logistic", "binary:logistic", "binary:logistraw", "count:poisson", "multi:softmax", "multi:softprob", "rank:pairwise", "reg:gamma", "reg:tweedie") Is binary:logistic I should use for now?

haowang-ms89 avatar May 17 '18 19:05 haowang-ms89

@haowang-ms89 ,

Yeah, I think that's right. Hyperparameters are passed in to XGBoost just as documented on the XGBoost GitHub page: https://github.com/dmlc/xgboost/blob/master/doc/parameter.md

andremoeller avatar May 17 '18 19:05 andremoeller

@andremoeller Thanks for the information! I will try it later and report the results.

haowang-ms89 avatar May 17 '18 19:05 haowang-ms89

@andremoeller It works fine with binary:logistic setting, thank you. I have a general question: is there a way that model.fit() output the training error/loss during training?

haowang-ms89 avatar May 18 '18 07:05 haowang-ms89

@haowang-ms89 ,

Do they show up in your CloudWatch logs for your XGBoost training job? If not, we won't be able to get them, but if so: streaming logs from CloudWatch to Spark is possible, but just not implemented.

andremoeller avatar May 18 '18 18:05 andremoeller

Labeling this as a bug and keeping this open to track the new output format for XGBoost for multi-dimensional arrays.

andremoeller avatar Jun 17 '18 00:06 andremoeller