mleap icon indicating copy to clipboard operation
mleap copied to clipboard

Serializing/Deserializing to/from Hadoop

Open will-m-buchanan opened this issue 4 years ago • 1 comments

Versions: MLeap: 0.12.0 Scala: 2.11.8 Spark: 2.3.1

I'm having issues saving and loading models directly in hadoop. I have a class that looks like

package com.myProject.nlp.my.utils

import ml.combust.bundle.BundleFile
import ml.combust.bundle.serializer.SerializationFormat
import ml.combust.mleap.runtime.MleapSupport._
import ml.combust.mleap.runtime.transformer.Pipeline
import ml.combust.mleap.spark.SparkSupport._
import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.bundle.SparkBundleContext
import org.apache.spark.sql.{Dataset, Row}
import resource.managed

object SparkToMLeapModelSerializer {

  private final val FILE_PREFIX = "jar:file:"

  /**
    * Serialize a Spark PipelineModel into a zip in MLeap format
    * @param model: Spark PipelineModel
    * @param dataset: Must provide a transformed Spark dataframe to MLeap for serializing a pipeline.
    *                 Used to extract data types and other metadata
    * @param pathToZip: Path to write MLeap zip file to
    */
  def serialize(model: PipelineModel, dataset: Dataset[Row], pathToZip: String): Unit = {
    val sbc: SparkBundleContext = SparkBundleContext().withDataset(dataset)
    for (bf <- managed(BundleFile(FILE_PREFIX + pathToZip))) {
      model.writeBundle.format(SerializationFormat.Protobuf).save(bf)(sbc).get
    }
  }

  /**
    * Deserialize an MLeap Pipeline model from a zip
    * @param pathToZip: path to zip file containing MLeap model
    * @return MLeap Pipeline
    */
  def deserializeToMLeap(pathToZip: String): Pipeline = {
    (for(bf <- managed(BundleFile(FILE_PREFIX + pathToZip))) yield {
      bf.loadMleapBundle().get.root
    }).tried.get.asInstanceOf[Pipeline]
  }

  /**
    * Deserialize an Spark PipelineModel from an MLeap Transformer zip
    * @param pathToZip: path to zip file containing MLeap model
    * @return Spark PipelineModel
    */
  def deserializeToSpark(pathToZip: String): PipelineModel = {
    (for(bf <- managed(BundleFile(FILE_PREFIX + pathToZip))) yield {
      bf.loadSparkBundle().get.root
    }).tried.get.asInstanceOf[PipelineModel]
  }
}

(note: I'm very new to scala, so please forgive any egregious bad practices)

When I use, say deserializeToSpark and pass in hdfs://<hdfs_ip>:9000/hdfs/output/path/to/model.zip I get the following stacktrace

Exception in thread "main" com.myproject.nlp.nta.myApplication$myExecutorException: com.myproject.nlp.nta.exceptions.myModelLoadException: Failure to load model hdfs://<hdfs_ip>/hdfs/output/path/to/model folder from hdfs://<hdfs_ip>/hdfs/output/path/to/model/model.zip :
    at com.myproject.nlp.nta.myApplication.main(myApplication.java:31)
Caused by: com.myproject.nlp.nta.exceptions.myModelLoadException: Failure to load model hdfs://<hdfs_ip>/hdfs/output/path/to/model folder from hdfs://<hdfs_ip>/hdfs/output/path/to/model/model.zip :
    at com.myproject.nlp.nta.utils.LMRModelUtils.loadPipelineModelFromModelFolder(LMRModelUtils.java:65)
    at com.myproject.nlp.nta.utils.LMRModelUtils.loadPipelineModelFromPath(LMRModelUtils.java:49)
    at com.myproject.nlp.nta.executor.runner.myEvaluationRunner.evaluateModel(myEvaluationRunner.java:169)
    at com.myproject.nlp.nta.executor.myEvaluationExecutor.run(myEvaluationExecutor.java:24)
    at com.myproject.nlp.nta.myApplication.main(myApplication.java:25)
Caused by: java.lang.IllegalArgumentException: URI is not hierarchical
    at sun.nio.fs.UnixUriUtils.fromUri(UnixUriUtils.java:48)
    at sun.nio.fs.UnixFileSystemProvider.getPath(UnixFileSystemProvider.java:98)
    at java.nio.file.Paths.get(Paths.java:138)
    at com.sun.nio.zipfs.ZipFileSystemProvider.uriToPath(ZipFileSystemProvider.java:85)
    at com.sun.nio.zipfs.ZipFileSystemProvider.newFileSystem(ZipFileSystemProvider.java:107)
    at java.nio.file.FileSystems.newFileSystem(FileSystems.java:326)
    at java.nio.file.FileSystems.newFileSystem(FileSystems.java:276)
    at ml.combust.bundle.BundleFile$.apply(BundleFile.scala:70)
    at ml.combust.bundle.BundleFile$.apply(BundleFile.scala:40)
    at com.myproject.nlp.nta.utils.SparkToMLeapModelSerializer$$anonfun$deserializeToSpark$1.apply(SparkToMLeapModelSerializer.scala:48)
    at com.myproject.nlp.nta.utils.SparkToMLeapModelSerializer$$anonfun$deserializeToSpark$1.apply(SparkToMLeapModelSerializer.scala:48)
    at resource.DefaultManagedResource.open(AbstractManagedResource.scala:110)
    at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:87)
    at resource.ManagedResourceOperations$class.apply(ManagedResourceOperations.scala:26)
    at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50)
    at resource.DeferredExtractableManagedResource$$anonfun$tried$1.apply(AbstractManagedResource.scala:33)
    at scala.util.Try$.apply(Try.scala:192)
    at resource.DeferredExtractableManagedResource.tried(AbstractManagedResource.scala:33)
    at com.myproject.nlp.nta.utils.SparkToMLeapModelSerializer$.deserializeToSpark(SparkToMLeapModelSerializer.scala:50)
    at com.myproject.nlp.nta.utils.SparkToMLeapModelSerializer.deserializeToSpark(SparkToMLeapModelSerializer.scala)
    at com.myproject.nlp.nta.utils.LMRModelUtils.loadPipelineModelFromModelFolder(LMRModelUtils.java:57)
    ... 4 more

If I remove the FILE_PREFIX I get a scala.MatchError on hdfs at ml.combust.bundle.BundleFile.apply() at line 53. If I change FILE_PREFIX to file: I get a nullPointerException in the same method on line 59 when calling uriSafe.getPath. And when I change FILE_PREFIX to jar: I get

...
Caused by: java.nio.file.FileSystemNotFoundException: Provider "hdfs" not installed
	at java.nio.file.Paths.get(Paths.java:147)
	at com.sun.nio.zipfs.ZipFileSystemProvider.uriToPath(ZipFileSystemProvider.java:85)
	at com.sun.nio.zipfs.ZipFileSystemProvider.newFileSystem(ZipFileSystemProvider.java:107)
	at java.nio.file.FileSystems.newFileSystem(FileSystems.java:326)
	at java.nio.file.FileSystems.newFileSystem(FileSystems.java:276)
	at ml.combust.bundle.BundleFile$.apply(BundleFile.scala:70)
	at ml.combust.bundle.BundleFile$.apply(BundleFile.scala:40)
...

What can I do to fix these errors and successfully serialize/deserialize mleap models directly on hadoop?

will-m-buchanan avatar Jun 11 '20 13:06 will-m-buchanan

You need to write locally and then copy to hadoop

leahmcguire avatar Oct 09 '20 17:10 leahmcguire