mleap icon indicating copy to clipboard operation
mleap copied to clipboard

OneHotEncoder serialization failed

Open inardini opened this issue 3 years ago • 3 comments

To who it may concern,

I'm using mleap-pyspark to serialize the following pipeline using pyspark 3.0.2 and mleap 0.18.1.

imputer --> string_indexer --> imputer --> string_indexer --> one_hot_encoder --> vector_assembler --> scaler --> vector_assembler --> random_classifier

But I get this error:

Py4JJavaError: An error occurred while calling o99817.serializeToBundle. : java.lang.RuntimeException: unsupported attribute for field loan_term_idx_imputed at org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp$.sizeForField(OneHotEncoderOp.scala:31) at org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp$$anon$1.$anonfun$store$2(OneHotEncoderOp.scala:47) at org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp$$anon$1.$anonfun$store$2$adapted(OneHotEncoderOp.scala:47) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp$$anon$1.store(OneHotEncoderOp.scala:47) at org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp$$anon$1.store(OneHotEncoderOp.scala:37) at ml.combust.bundle.serializer.ModelSerializer.$anonfun$write$1(ModelSerializer.scala:87) at scala.util.Try$.apply(Try.scala:213) at ml.combust.bundle.serializer.ModelSerializer.write(ModelSerializer.scala:83) at ml.combust.bundle.serializer.NodeSerializer.$anonfun$write$1(NodeSerializer.scala:85) at scala.util.Try$.apply(Try.scala:213) at ml.combust.bundle.serializer.NodeSerializer.write(NodeSerializer.scala:81) at ml.combust.bundle.serializer.GraphSerializer.$anonfun$writeNode$1(GraphSerializer.scala:34) at scala.util.Try$.apply(Try.scala:213) at ml.combust.bundle.serializer.GraphSerializer.writeNode(GraphSerializer.scala:30) at ml.combust.bundle.serializer.GraphSerializer.$anonfun$write$2(GraphSerializer.scala:21) at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38) at ml.combust.bundle.serializer.GraphSerializer.write(GraphSerializer.scala:21) at org.apache.spark.ml.bundle.ops.PipelineOp$$anon$1.store(PipelineOp.scala:21) at org.apache.spark.ml.bundle.ops.PipelineOp$$anon$1.store(PipelineOp.scala:14) at ml.combust.bundle.serializer.ModelSerializer.$anonfun$write$1(ModelSerializer.scala:87) at scala.util.Try$.apply(Try.scala:213) at ml.combust.bundle.serializer.ModelSerializer.write(ModelSerializer.scala:83) at ml.combust.bundle.serializer.NodeSerializer.$anonfun$write$1(NodeSerializer.scala:85) at scala.util.Try$.apply(Try.scala:213) at ml.combust.bundle.serializer.NodeSerializer.write(NodeSerializer.scala:81) at ml.combust.bundle.serializer.BundleSerializer.$anonfun$write$1(BundleSerializer.scala:34) at scala.util.Try$.apply(Try.scala:213) at ml.combust.bundle.serializer.BundleSerializer.write(BundleSerializer.scala:29) at ml.combust.bundle.BundleWriter.save(BundleWriter.scala:34) at ml.combust.mleap.spark.SimpleSparkSerializer.$anonfun$serializeToBundleWithFormat$4(SimpleSparkSerializer.scala:26) at resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88) at scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) at scala.util.control.Exception$Catch.apply(Exception.scala:228) at scala.util.control.Exception$Catch.either(Exception.scala:252) at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) at resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) at resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) at resource.DeferredExtractableManagedResource.$anonfun$tried$1(AbstractManagedResource.scala:33) at scala.util.Try$.apply(Try.scala:213) at resource.DeferredExtractableManagedResource.tried(AbstractManagedResource.scala:33) at ml.combust.mleap.spark.SimpleSparkSerializer.serializeToBundleWithFormat(SimpleSparkSerializer.scala:25) at ml.combust.mleap.spark.SimpleSparkSerializer.serializeToBundle(SimpleSparkSerializer.scala:17) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

Any insights?

inardini avatar Oct 03 '21 17:10 inardini

From looking at the source code, this error is caused by the one hot encoder op not being able to resolve the categorical size of the loan_term_idx_imputed field. It expects the field to be either nominal or binary and I think it is getting an "unresolved" type right now.

Few followup questions:

  • Are you setting InputCol or InputCols for the one hot encoder? Mleap seems like it only supports InputCols atm, which I'm guessing you are using already, but just to confirm.
  • Looks like the one hot encoder has the loan_term_idx_imputed column as part of its InputCols? Is that what you expected? I.e., you didn't want the string indexer output to be the one hot encoder input?
  • Can you let us know what the schema metadata is for this column? I.e., first transform the dataframe with your pipeline, then do df.select(df.loan_term_idx_imputed).schema[0].metadata. Also report back with this metadata for the string indexer output column if you don't mind. This information is what gets used to determine whether the field is nominal/binary/numeric/unresolved.

jsleight avatar Oct 04 '21 15:10 jsleight

Thanks for feedback.

Point by Point:

  • I'm setting InputCols below the code:

one_hot_encoder = OneHotEncoder(dropLast=False, inputCols=categorical_imputer.getOutputCols(), outputCols=ONE_HOT_ENCODED_FEATURES, handleInvalid='keep')

  • Yep, it is expected. But basically are you suggesting to impute the categorical variables. Then convert in strings and apply one hot encoder?

  • About the test, below the transformation journey of loan_term variable:

     1. train.select(train.loan_term).schema[0].metadata --> {}
     2. train.select(train.loan_term_idx).schema[0].metadata --> {'ml_attr': {'name': 'loan_term_idx', 'type': 'nominal', 'vals' ['360', '180', 'nan', '480', '120', '300', '240', '60',   '__unknown']}}
     3. train.select(train.loan_term_idx_imputed).schema[0].metadata --> {}
    

Hope it helps

inardini avatar Oct 05 '21 07:10 inardini

Thanks for the requested info.

So what is happening is that the loan_term_idx field is considered a "nominal" attribute, but loan_term_idx_imputed has reset that attribute status so it is now "unresolved". MLeap's one hot encoder op requires that the transformer have a fixed size of state, which it infers from the vals.size in the metadata for each of inputCols.

Since MLeap is on Spark v3 now though, we could instead look at the new OneHotEncoderModel.categorySizes property instead of inferring things from metadata. We need to change this line

So there are two paths forward for you:

  1. Wait for this fix to go though. If you're up for it, I'm happy to review and merge a PR 😄 , then you'd need to wait for the next release or use a snapshot.
  2. Alter your pipeline. I think you can either put the imputer before the stringindexer or can maybe even remove the imputer all together and let the one encoder's keep invalid handle the values which would otherwise be imputed.

Cheers!

jsleight avatar Oct 05 '21 15:10 jsleight