scanns icon indicating copy to clipboard operation
scanns copied to clipboard

Priority Queue Serialization Error

Open LaurelStan opened this issue 6 years ago • 31 comments

In ModelTest.scala it is documented that: "// This is unable to be run currently due to some PriorityQueue serialization issues" This is the error I am running into. Are there any workarounds for this? I am currently unable to print any results without a solution to this problem.

LaurelStan avatar May 31 '18 14:05 LaurelStan

Hi I also have this problem.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 946.0 in stage 16.0 (TID 1971) had a not serializable result: scala.collection.mutable.PriorityQueue$$anon$3 Serialization stack: - object not serializable (class: scala.collection.mutable.PriorityQueue$$anon$3, value: non-empty iterator)

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:921)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:919)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:919)
at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:351)
at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)

howie avatar Jun 07 '18 10:06 howie

Apologies for the delay in responding. Could you please provide a minimal snippet of code that can reproduce the problem?

namitk avatar Jun 08 '18 19:06 namitk

It occurred when I tried to use your ModelTest.scala code.

LaurelStan avatar Jun 08 '18 19:06 LaurelStan

I'm getting the same issue using the scala 2.10 version. @LaurelStan are you using scala 2.10? I think it may be related to https://issues.scala-lang.org/browse/SI-7568

joerenner avatar Jun 12 '18 07:06 joerenner

@joerenner I use this library on 2.11 (spark 2.3.0) still got this problem.

howie avatar Jun 13 '18 01:06 howie

Hi @namitk

I use your library in java environment, the following is sample code

 w2v.printSchema();
 RDD<Tuple2<Object, Vector>> map = w2v.javaRDD()
                                             .map(row -> new Tuple2<>(row.getAs("index"),
                                                                      (org.apache.spark.ml.linalg.Vector) row.getAs(
                                                                              "vectors")))
                                             .rdd();
System.out.println("toRDD:" + map.count());
LSHNearestNeighborSearchModel<CosineSignRandomProjectionModel> model = new CosineSignRandomProjectionNNS(
                "signrp").setNumHashes(300)
                         .setSignatureLength(15)
                         .setJoinParallelism(5000)
                         .setBucketLimit(1000)
                         .setShouldSampleBuckets(true)
                         .setNumOutputPartitions(100)
                         .createModel(vectorSize);

// get 100 nearest neighbors for each item in items from within itself
// RDD[(Long, Long, Double)]
RDD<Tuple3<Object, Object, Object>> selfAllNearestNeighbors = model.getSelfAllNearestNeighbors(map, 6);
selfAllNearestNeighbors.toJavaRDD().foreach(tu->{
            System.out.println(tu._1() + " " + tu._2() + " " + tu._3());

+--------------------------------+-----+----------------------------------------------------------------------------------------------------+ | rid|index| vectors| +--------------------------------+-----+----------------------------------------------------------------------------------------------------+ |78c75b8dd86f96e084143b51ffe9c0ee| 0|[0.0267901596636572,-0.05762127241791859,-0.023966949549868367,-0.04586133027169126,-0.0069206741...| |d16432bde1375a92667013ce6b36d0b2| 1|[0.06872010593640229,-0.07356948241944554,-0.04449785742076077,-0.004567217407902295,-0.036333482...| |b15726a4ab9c742689ce956f126cdadb| 2|[0.04203528417482075,-0.10665688237069007,-0.06613768664555757,0.008198245994699949,0.00161943160...| +--------------------------------+-----+----------------------------------------------------------------------------------------------------+

root |-- rid: string (nullable = false) |-- index: long (nullable = false) |-- vectors: vector (nullable = false)

toRDD:3

[2018-06-13 09:53:33,873][ERROR] Executor : Exception in task 2325.0 in stage 16.0 (TID 3350) java.io.NotSerializableException: scala.collection.mutable.PriorityQueue$$anon$3 Serialization stack: - object not serializable (class: scala.collection.mutable.PriorityQueue$$anon$3, value: non-empty iterator) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134) at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:241) at org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:699) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [2018-06-13 09:53:33,912][ERROR] TaskSetManager : Task 2325.0 in stage 16.0 (TID 3350) had a not serializable result: scala.collection.mutable.PriorityQueue$$anon$3 Serialization stack: - object not serializable (class: scala.collection.mutable.PriorityQueue$$anon$3, value: non-empty iterator); not retrying

howie avatar Jun 13 '18 01:06 howie

The scala ticket that @joerenner pointed indices this issue has been fixed in vScala 2.11.0-M7. That said, I came across BoundedPriorityQueue in the spark code base which I hadn't used due to it being private to org.apache.spark. However, I noticed that they use the Java priority queue, probably to avoid the same issue. I'll look into whether I can do something similar.

That said, at least on my side, I only get this issue while running in the local mode of spark. The library seems to run fine for me in spark cluster mode. Don't fully understand why, I'll try to dig further.

namitk avatar Jun 13 '18 04:06 namitk

Hi @namitk, is there any update on this issue? I tried to reimplement TopNQueue using java.util.PriorityQueue but still got Serialization issues. You can see the gist: https://gist.github.com/nicola007b/454bc77c435cff65e5cdd73ced316e1c

18/10/04 07:48:59 INFO DAGScheduler: ShuffleMapStage 3 (zipPartitions at LSHNearestNeighborSearchModel.scala:265) failed in 0.768 s due to Job aborted due to stage failure: Task 0.0 in stage 3.0 (TID 4) had a not serializable result: scala.collection.LinearSeqLike$$anon$1
Serialization stack:
	- object not serializable (class: scala.collection.LinearSeqLike$$anon$1, value: non-empty iterator)

nicola007b avatar Oct 03 '18 14:10 nicola007b

Hi @nicola007b I tried the same thing as you and swapped out scala PriorityQueue with the java.util.PriorityQueue, but I then ran into the same serialization error.

Could this new issue be caused by the mutable.Map or mutable.ArrayBuffer in LSHNearestNeighborSearchModel.scala? Considering that the scala PriorityQueue is in fact a mutable.PriorityQueue and part of the same mutable library

oscaroboto avatar Nov 15 '18 19:11 oscaroboto

@oscaroboto thanks for following up on this. I believe the issue is in https://github.com/linkedin/scanns/blob/master/scanns/src/main/scala/com/linkedin/nn/model/LSHNearestNeighborSearchModel.scala#L55 which takes as input an Iterator[Array[mutable.ArrayBuffer[ItemId]]] which is not serializable. @namitk what are your thoughts on this?

nicola007b avatar Nov 29 '18 10:11 nicola007b

Also got this issue. After replacing all the Iterator in NearestNeighborIterator class (except the outer Iterator of the return type) with Seq/List the exception is gone.

Itfly avatar Dec 03 '18 09:12 Itfly

@Itfly would you mind posting exactly what you changed please?

MarkTickner avatar Dec 05 '18 14:12 MarkTickner

@Itfly I used IndexedSeq instead of Seq/List. Seq are a special cases of iterable and got the same error. List worked too but thought IndexedSeq may give better performance.

oscaroboto avatar Dec 05 '18 17:12 oscaroboto

no need to modify TopNQueue

oscaroboto avatar Dec 05 '18 17:12 oscaroboto

@oscaroboto @Itfly would be great if you could open a PR with the fix

nicola007b avatar Dec 05 '18 17:12 nicola007b

@nicola007b doing some tests and will PR soon

oscaroboto avatar Dec 05 '18 17:12 oscaroboto

@MarkTickner I changed NearestNeighborIterator' to

private[model] class NearestNeighborIterator(buckets: Seq[Seq[Seq[ItemId]]],
                                itemVectors: mutable.Map[ItemId, Vector],
                                numNearestNeighbors: Int) extends Iterator[(ItemId, List[ItemIdDistancePair])]

and added an index to support the original buckets iterator's HashNext and Next methods.

Guessing it's because Iterator do not support serialization, so all the members in NearestNeighborIterator should not use Iterator object including the return object of the override next method which maybe called in the serialization/deserialization process.

Itfly avatar Dec 06 '18 03:12 Itfly

@oscaroboto thanks for your advice. I'm a scala/spark beginner.

Itfly avatar Dec 06 '18 03:12 Itfly

@namitk @oscaroboto @Itfly do you solve this issue?

zhihuawon avatar Dec 08 '18 09:12 zhihuawon

@zhihuawon Here's my modification, hope this can help you too.

private[model] class NearestNeighborIterator(buckets: IndexedSeq[Array[mutable.ArrayBuffer[ItemId]]],
                                itemVectors: mutable.Map[ItemId, Vector],
                                numNearestNeighbors: Int) extends Iterator[(ItemId, IndexedSeq[ItemIdDistancePair])]
    with Serializable {

    private val bucketsSize = buckets.size
    private var bucketsIndex = 0

    // this will be the next element that the iterator returns on a call to next()
    private var nextResult: Option[(ItemId, IndexedSeq[ItemIdDistancePair])] = None

    // this is the current tuple in the bucketsIt iterator that is being scanned
    private var currentTuple = if (hasNextBucket()) Some(nextBucket()) else None

    // this is the index in the first array of currentTuple which is being scanned
    private var currentIndex = 0

    private def hasNextBucket(): Boolean ={
      bucketsIndex < bucketsSize
    }

    private def nextBucket(): Array[mutable.ArrayBuffer[ItemId]]={
      val result = buckets(bucketsIndex)
      bucketsIndex += 1
      result
    }

    private def populateNext(): Unit = {
      var done = false
      while (currentTuple.isDefined && !done) {
        currentTuple match {
          case Some(x) =>
            while (currentIndex < x(0).size && !done) {
              val queue = new TopNQueue(numNearestNeighbors)
              val currentId = x(0)(currentIndex)
              x(1).filter(_ != currentId)
                .map(c => (c, distance.compute(itemVectors(c), itemVectors(currentId))))
                .foreach(queue.enqueue(_))
              if (queue.nonEmpty()) {
                nextResult = Some((currentId, queue.values()))
                //nextResult = Some((x(0)(currentIndex), queue.iterator()))
                done = true
              }
              currentIndex += 1
            }
            if (currentIndex == x(0).size) {
              currentIndex = 0
              currentTuple = if (hasNextBucket()) Some(nextBucket()) else None
            }
          case _ =>
        }
      }
      if (currentTuple.isEmpty && !done) {
        nextResult = None
      }
    }

    populateNext()

    override def hasNext: Boolean = nextResult.isDefined

    override def next(): (ItemId, IndexedSeq[ItemIdDistancePair]) = {
      if (hasNext) {
        val ret = nextResult.get
        populateNext()
        return (ret._1, ret._2)
      }
      null
    }
  }

Then use hashBuckets.values.toIndexedSeq to instantiate NearestNeighborIterator, also need to add a method values in TopNQueue:

  def values(): IndexedSeq[ItemIdDistancePair] = priorityQ.dequeueAll.reverse

Itfly avatar Dec 10 '18 03:12 Itfly

@Itfly thanks a lot

zhihuawon avatar Dec 10 '18 04:12 zhihuawon

Sorry for the delay everyone. I finally got a chance to make the PR.

oscaroboto avatar Dec 18 '18 05:12 oscaroboto

Any Updates? its not work for me.

tatsuya-takahashi avatar Dec 29 '18 10:12 tatsuya-takahashi

I am also facing the same issue.

danielmelemed avatar Jan 08 '19 00:01 danielmelemed

I am also facing the same issue. Has the issue fixed ?

pmaditya avatar Mar 10 '19 17:03 pmaditya

If this is a dead project, could you mark it as such instead of wasting people's time?

LanceNorskog avatar Mar 28 '19 22:03 LanceNorskog

This appears to be a dead project 😔

oscaroboto avatar Mar 28 '19 22:03 oscaroboto

Hey folks, I'm really sorry I haven't been on top of things and didn't notice @oscaroboto PR #6. I switched jobs in Oct 2018 and it seems this isn't being maintained by anyone at LinkedIn anymore. After leaving, I am no longer working in an environment where I use or have access to a spark cluster so haven't been keeping up with the project. I have commented on the PR however and will be happy to review if one of you can help fix this issue.

Apologies :(

namitk avatar Mar 29 '19 06:03 namitk

Hi @namitk! Thank you for you reply! I just made a PR

oscaroboto avatar Mar 29 '19 13:03 oscaroboto

I wanted to ask you guys where you get this error. As I have mentioned in the thread above, I do not see this error anywhere except in the local test. The same code being executed in a spark shell in local mode as well as cluster mode runs successfully. Is that not what you guys observe? If not, could you please tell me what version of spark, scanns and your environment (java / scala version etc) you are running where you see this error in a non-local-test environment?

I used it as follows (also works with scala 2.10 versions of spark and scanns) ../spark-2.1.0-bin-hadoop2.7/bin/spark-shell --jars build/scanns_2.11/libs/scanns_2.11-1.0.1.jar image

namitk avatar Mar 31 '19 22:03 namitk