mleap
mleap copied to clipboard
Serializing/Deserializing to/from Hadoop
Versions: MLeap: 0.12.0 Scala: 2.11.8 Spark: 2.3.1
I'm having issues saving and loading models directly in hadoop. I have a class that looks like
package com.myProject.nlp.my.utils
import ml.combust.bundle.BundleFile
import ml.combust.bundle.serializer.SerializationFormat
import ml.combust.mleap.runtime.MleapSupport._
import ml.combust.mleap.runtime.transformer.Pipeline
import ml.combust.mleap.spark.SparkSupport._
import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.bundle.SparkBundleContext
import org.apache.spark.sql.{Dataset, Row}
import resource.managed
object SparkToMLeapModelSerializer {
private final val FILE_PREFIX = "jar:file:"
/**
* Serialize a Spark PipelineModel into a zip in MLeap format
* @param model: Spark PipelineModel
* @param dataset: Must provide a transformed Spark dataframe to MLeap for serializing a pipeline.
* Used to extract data types and other metadata
* @param pathToZip: Path to write MLeap zip file to
*/
def serialize(model: PipelineModel, dataset: Dataset[Row], pathToZip: String): Unit = {
val sbc: SparkBundleContext = SparkBundleContext().withDataset(dataset)
for (bf <- managed(BundleFile(FILE_PREFIX + pathToZip))) {
model.writeBundle.format(SerializationFormat.Protobuf).save(bf)(sbc).get
}
}
/**
* Deserialize an MLeap Pipeline model from a zip
* @param pathToZip: path to zip file containing MLeap model
* @return MLeap Pipeline
*/
def deserializeToMLeap(pathToZip: String): Pipeline = {
(for(bf <- managed(BundleFile(FILE_PREFIX + pathToZip))) yield {
bf.loadMleapBundle().get.root
}).tried.get.asInstanceOf[Pipeline]
}
/**
* Deserialize an Spark PipelineModel from an MLeap Transformer zip
* @param pathToZip: path to zip file containing MLeap model
* @return Spark PipelineModel
*/
def deserializeToSpark(pathToZip: String): PipelineModel = {
(for(bf <- managed(BundleFile(FILE_PREFIX + pathToZip))) yield {
bf.loadSparkBundle().get.root
}).tried.get.asInstanceOf[PipelineModel]
}
}
(note: I'm very new to scala, so please forgive any egregious bad practices)
When I use, say deserializeToSpark
and pass in hdfs://<hdfs_ip>:9000/hdfs/output/path/to/model.zip
I get the following stacktrace
Exception in thread "main" com.myproject.nlp.nta.myApplication$myExecutorException: com.myproject.nlp.nta.exceptions.myModelLoadException: Failure to load model hdfs://<hdfs_ip>/hdfs/output/path/to/model folder from hdfs://<hdfs_ip>/hdfs/output/path/to/model/model.zip :
at com.myproject.nlp.nta.myApplication.main(myApplication.java:31)
Caused by: com.myproject.nlp.nta.exceptions.myModelLoadException: Failure to load model hdfs://<hdfs_ip>/hdfs/output/path/to/model folder from hdfs://<hdfs_ip>/hdfs/output/path/to/model/model.zip :
at com.myproject.nlp.nta.utils.LMRModelUtils.loadPipelineModelFromModelFolder(LMRModelUtils.java:65)
at com.myproject.nlp.nta.utils.LMRModelUtils.loadPipelineModelFromPath(LMRModelUtils.java:49)
at com.myproject.nlp.nta.executor.runner.myEvaluationRunner.evaluateModel(myEvaluationRunner.java:169)
at com.myproject.nlp.nta.executor.myEvaluationExecutor.run(myEvaluationExecutor.java:24)
at com.myproject.nlp.nta.myApplication.main(myApplication.java:25)
Caused by: java.lang.IllegalArgumentException: URI is not hierarchical
at sun.nio.fs.UnixUriUtils.fromUri(UnixUriUtils.java:48)
at sun.nio.fs.UnixFileSystemProvider.getPath(UnixFileSystemProvider.java:98)
at java.nio.file.Paths.get(Paths.java:138)
at com.sun.nio.zipfs.ZipFileSystemProvider.uriToPath(ZipFileSystemProvider.java:85)
at com.sun.nio.zipfs.ZipFileSystemProvider.newFileSystem(ZipFileSystemProvider.java:107)
at java.nio.file.FileSystems.newFileSystem(FileSystems.java:326)
at java.nio.file.FileSystems.newFileSystem(FileSystems.java:276)
at ml.combust.bundle.BundleFile$.apply(BundleFile.scala:70)
at ml.combust.bundle.BundleFile$.apply(BundleFile.scala:40)
at com.myproject.nlp.nta.utils.SparkToMLeapModelSerializer$$anonfun$deserializeToSpark$1.apply(SparkToMLeapModelSerializer.scala:48)
at com.myproject.nlp.nta.utils.SparkToMLeapModelSerializer$$anonfun$deserializeToSpark$1.apply(SparkToMLeapModelSerializer.scala:48)
at resource.DefaultManagedResource.open(AbstractManagedResource.scala:110)
at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:87)
at resource.ManagedResourceOperations$class.apply(ManagedResourceOperations.scala:26)
at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50)
at resource.DeferredExtractableManagedResource$$anonfun$tried$1.apply(AbstractManagedResource.scala:33)
at scala.util.Try$.apply(Try.scala:192)
at resource.DeferredExtractableManagedResource.tried(AbstractManagedResource.scala:33)
at com.myproject.nlp.nta.utils.SparkToMLeapModelSerializer$.deserializeToSpark(SparkToMLeapModelSerializer.scala:50)
at com.myproject.nlp.nta.utils.SparkToMLeapModelSerializer.deserializeToSpark(SparkToMLeapModelSerializer.scala)
at com.myproject.nlp.nta.utils.LMRModelUtils.loadPipelineModelFromModelFolder(LMRModelUtils.java:57)
... 4 more
If I remove the FILE_PREFIX
I get a scala.MatchError on hdfs
at ml.combust.bundle.BundleFile.apply()
at line 53. If I change FILE_PREFIX
to file:
I get a nullPointerException in the same method on line 59 when calling uriSafe.getPath
. And when I change FILE_PREFIX
to jar:
I get
...
Caused by: java.nio.file.FileSystemNotFoundException: Provider "hdfs" not installed
at java.nio.file.Paths.get(Paths.java:147)
at com.sun.nio.zipfs.ZipFileSystemProvider.uriToPath(ZipFileSystemProvider.java:85)
at com.sun.nio.zipfs.ZipFileSystemProvider.newFileSystem(ZipFileSystemProvider.java:107)
at java.nio.file.FileSystems.newFileSystem(FileSystems.java:326)
at java.nio.file.FileSystems.newFileSystem(FileSystems.java:276)
at ml.combust.bundle.BundleFile$.apply(BundleFile.scala:70)
at ml.combust.bundle.BundleFile$.apply(BundleFile.scala:40)
...
What can I do to fix these errors and successfully serialize/deserialize mleap models directly on hadoop?
You need to write locally and then copy to hadoop