scanns
scanns copied to clipboard
Priority Queue Serialization Error
In ModelTest.scala it is documented that: "// This is unable to be run currently due to some PriorityQueue serialization issues" This is the error I am running into. Are there any workarounds for this? I am currently unable to print any results without a solution to this problem.
Hi I also have this problem.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 946.0 in stage 16.0 (TID 1971) had a not serializable result: scala.collection.mutable.PriorityQueue$$anon$3 Serialization stack: - object not serializable (class: scala.collection.mutable.PriorityQueue$$anon$3, value: non-empty iterator)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:921)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:919)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:919)
at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:351)
at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
Apologies for the delay in responding. Could you please provide a minimal snippet of code that can reproduce the problem?
It occurred when I tried to use your ModelTest.scala code.
I'm getting the same issue using the scala 2.10 version. @LaurelStan are you using scala 2.10? I think it may be related to https://issues.scala-lang.org/browse/SI-7568
@joerenner I use this library on 2.11 (spark 2.3.0) still got this problem.
Hi @namitk
I use your library in java environment, the following is sample code
w2v.printSchema();
RDD<Tuple2<Object, Vector>> map = w2v.javaRDD()
.map(row -> new Tuple2<>(row.getAs("index"),
(org.apache.spark.ml.linalg.Vector) row.getAs(
"vectors")))
.rdd();
System.out.println("toRDD:" + map.count());
LSHNearestNeighborSearchModel<CosineSignRandomProjectionModel> model = new CosineSignRandomProjectionNNS(
"signrp").setNumHashes(300)
.setSignatureLength(15)
.setJoinParallelism(5000)
.setBucketLimit(1000)
.setShouldSampleBuckets(true)
.setNumOutputPartitions(100)
.createModel(vectorSize);
// get 100 nearest neighbors for each item in items from within itself
// RDD[(Long, Long, Double)]
RDD<Tuple3<Object, Object, Object>> selfAllNearestNeighbors = model.getSelfAllNearestNeighbors(map, 6);
selfAllNearestNeighbors.toJavaRDD().foreach(tu->{
System.out.println(tu._1() + " " + tu._2() + " " + tu._3());
+--------------------------------+-----+----------------------------------------------------------------------------------------------------+ | rid|index| vectors| +--------------------------------+-----+----------------------------------------------------------------------------------------------------+ |78c75b8dd86f96e084143b51ffe9c0ee| 0|[0.0267901596636572,-0.05762127241791859,-0.023966949549868367,-0.04586133027169126,-0.0069206741...| |d16432bde1375a92667013ce6b36d0b2| 1|[0.06872010593640229,-0.07356948241944554,-0.04449785742076077,-0.004567217407902295,-0.036333482...| |b15726a4ab9c742689ce956f126cdadb| 2|[0.04203528417482075,-0.10665688237069007,-0.06613768664555757,0.008198245994699949,0.00161943160...| +--------------------------------+-----+----------------------------------------------------------------------------------------------------+
root |-- rid: string (nullable = false) |-- index: long (nullable = false) |-- vectors: vector (nullable = false)
toRDD:3
[2018-06-13 09:53:33,873][ERROR] Executor : Exception in task 2325.0 in stage 16.0 (TID 3350) java.io.NotSerializableException: scala.collection.mutable.PriorityQueue$$anon$3 Serialization stack: - object not serializable (class: scala.collection.mutable.PriorityQueue$$anon$3, value: non-empty iterator) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134) at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:241) at org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:699) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [2018-06-13 09:53:33,912][ERROR] TaskSetManager : Task 2325.0 in stage 16.0 (TID 3350) had a not serializable result: scala.collection.mutable.PriorityQueue$$anon$3 Serialization stack: - object not serializable (class: scala.collection.mutable.PriorityQueue$$anon$3, value: non-empty iterator); not retrying
The scala ticket that @joerenner pointed indices this issue has been fixed in vScala 2.11.0-M7. That said, I came across BoundedPriorityQueue in the spark code base which I hadn't used due to it being private to org.apache.spark. However, I noticed that they use the Java priority queue, probably to avoid the same issue. I'll look into whether I can do something similar.
That said, at least on my side, I only get this issue while running in the local mode of spark. The library seems to run fine for me in spark cluster mode. Don't fully understand why, I'll try to dig further.
Hi @namitk, is there any update on this issue?
I tried to reimplement TopNQueue
using java.util.PriorityQueue but still got Serialization issues. You can see the gist:
https://gist.github.com/nicola007b/454bc77c435cff65e5cdd73ced316e1c
18/10/04 07:48:59 INFO DAGScheduler: ShuffleMapStage 3 (zipPartitions at LSHNearestNeighborSearchModel.scala:265) failed in 0.768 s due to Job aborted due to stage failure: Task 0.0 in stage 3.0 (TID 4) had a not serializable result: scala.collection.LinearSeqLike$$anon$1
Serialization stack:
- object not serializable (class: scala.collection.LinearSeqLike$$anon$1, value: non-empty iterator)
Hi @nicola007b I tried the same thing as you and swapped out scala PriorityQueue with the java.util.PriorityQueue, but I then ran into the same serialization error.
Could this new issue be caused by the mutable.Map or mutable.ArrayBuffer in LSHNearestNeighborSearchModel.scala? Considering that the scala PriorityQueue is in fact a mutable.PriorityQueue and part of the same mutable library
@oscaroboto thanks for following up on this. I believe the issue is in
https://github.com/linkedin/scanns/blob/master/scanns/src/main/scala/com/linkedin/nn/model/LSHNearestNeighborSearchModel.scala#L55 which takes as input an Iterator[Array[mutable.ArrayBuffer[ItemId]]]
which is not serializable.
@namitk what are your thoughts on this?
Also got this issue. After replacing all the Iterator in NearestNeighborIterator class (except the outer Iterator of the return type) with Seq/List the exception is gone.
@Itfly would you mind posting exactly what you changed please?
@Itfly I used IndexedSeq instead of Seq/List. Seq are a special cases of iterable and got the same error. List worked too but thought IndexedSeq may give better performance.
no need to modify TopNQueue
@oscaroboto @Itfly would be great if you could open a PR with the fix
@nicola007b doing some tests and will PR soon
@MarkTickner I changed NearestNeighborIterator
' to
private[model] class NearestNeighborIterator(buckets: Seq[Seq[Seq[ItemId]]],
itemVectors: mutable.Map[ItemId, Vector],
numNearestNeighbors: Int) extends Iterator[(ItemId, List[ItemIdDistancePair])]
and added an index to support the original buckets iterator's HashNext
and Next
methods.
Guessing it's because Iterator do not support serialization, so all the members in NearestNeighborIterator should not use Iterator object including the return object of the override next
method which maybe called in the serialization/deserialization process.
@oscaroboto thanks for your advice. I'm a scala/spark beginner.
@namitk @oscaroboto @Itfly do you solve this issue?
@zhihuawon Here's my modification, hope this can help you too.
private[model] class NearestNeighborIterator(buckets: IndexedSeq[Array[mutable.ArrayBuffer[ItemId]]],
itemVectors: mutable.Map[ItemId, Vector],
numNearestNeighbors: Int) extends Iterator[(ItemId, IndexedSeq[ItemIdDistancePair])]
with Serializable {
private val bucketsSize = buckets.size
private var bucketsIndex = 0
// this will be the next element that the iterator returns on a call to next()
private var nextResult: Option[(ItemId, IndexedSeq[ItemIdDistancePair])] = None
// this is the current tuple in the bucketsIt iterator that is being scanned
private var currentTuple = if (hasNextBucket()) Some(nextBucket()) else None
// this is the index in the first array of currentTuple which is being scanned
private var currentIndex = 0
private def hasNextBucket(): Boolean ={
bucketsIndex < bucketsSize
}
private def nextBucket(): Array[mutable.ArrayBuffer[ItemId]]={
val result = buckets(bucketsIndex)
bucketsIndex += 1
result
}
private def populateNext(): Unit = {
var done = false
while (currentTuple.isDefined && !done) {
currentTuple match {
case Some(x) =>
while (currentIndex < x(0).size && !done) {
val queue = new TopNQueue(numNearestNeighbors)
val currentId = x(0)(currentIndex)
x(1).filter(_ != currentId)
.map(c => (c, distance.compute(itemVectors(c), itemVectors(currentId))))
.foreach(queue.enqueue(_))
if (queue.nonEmpty()) {
nextResult = Some((currentId, queue.values()))
//nextResult = Some((x(0)(currentIndex), queue.iterator()))
done = true
}
currentIndex += 1
}
if (currentIndex == x(0).size) {
currentIndex = 0
currentTuple = if (hasNextBucket()) Some(nextBucket()) else None
}
case _ =>
}
}
if (currentTuple.isEmpty && !done) {
nextResult = None
}
}
populateNext()
override def hasNext: Boolean = nextResult.isDefined
override def next(): (ItemId, IndexedSeq[ItemIdDistancePair]) = {
if (hasNext) {
val ret = nextResult.get
populateNext()
return (ret._1, ret._2)
}
null
}
}
Then use hashBuckets.values.toIndexedSeq
to instantiate NearestNeighborIterator
,
also need to add a method values
in TopNQueue
:
def values(): IndexedSeq[ItemIdDistancePair] = priorityQ.dequeueAll.reverse
@Itfly thanks a lot
Sorry for the delay everyone. I finally got a chance to make the PR.
Any Updates? its not work for me.
I am also facing the same issue.
I am also facing the same issue. Has the issue fixed ?
If this is a dead project, could you mark it as such instead of wasting people's time?
This appears to be a dead project 😔
Hey folks, I'm really sorry I haven't been on top of things and didn't notice @oscaroboto PR #6. I switched jobs in Oct 2018 and it seems this isn't being maintained by anyone at LinkedIn anymore. After leaving, I am no longer working in an environment where I use or have access to a spark cluster so haven't been keeping up with the project. I have commented on the PR however and will be happy to review if one of you can help fix this issue.
Apologies :(
Hi @namitk! Thank you for you reply! I just made a PR
I wanted to ask you guys where you get this error. As I have mentioned in the thread above, I do not see this error anywhere except in the local test. The same code being executed in a spark shell in local mode as well as cluster mode runs successfully. Is that not what you guys observe? If not, could you please tell me what version of spark, scanns and your environment (java / scala version etc) you are running where you see this error in a non-local-test environment?
I used it as follows (also works with scala 2.10 versions of spark and scanns)
../spark-2.1.0-bin-hadoop2.7/bin/spark-shell --jars build/scanns_2.11/libs/scanns_2.11-1.0.1.jar