spark-lucenerdd
spark-lucenerdd copied to clipboard
Help debugging a OOM issue when the search population increases
Describe the bug
I have a LuceneRDD that I construct using the following syntax :
val index = LuceneRDD(data).persist(StorageLevel.MEMORY_AND_DISK)
For which I run against another dataset using the following code :
val results = index.link(searchBase.rdd, Linker.queryBuilderLinker, topK).filter(_._2.nonEmpty)
My distribution is the following :
index : 90M records , 300 partitions searchBase : 2M records , 10 partitions Topk : 1000
My cluster configuration is :
Executor-cores 2
Executor-memory 19g
Driver-memory 25g
And my linker method is
lucenerdd {
linker.method = "cartesian"
store.mode = "disk"
}
Code that is failing :
flatMap at LuceneRDD.scala:222
And the exception is :
Job aborted due to stage failure: Task 9525 in stage 11.0 failed 4 times, most recent failure: Lost task 9525.3 in stage 11.0 (TID 12771, executor 89): ExecutorLostFailure (executor 89 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 21.0 GB of 21 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Failed task :
Increasing the number of partitions for searchBase or index did not make any difference. The only time that the job succeeds is when I reduce my search base to 1M. Reducing topk did not make any difference as well
My cluster is big enough to be able to cache the index base (I am able to see that in the storage page) , so I am having a hard time understanding the source of the OOM. :
Thanks for your help!
@zouzias Do you have any idea? I keep getting this error and changing the partitioning does not help
Can you try with linker method linker.method = "collectbroadcast"
?
I think you should be able to broadcast the search base to all executors since they are only 2M.
See: https://github.com/zouzias/spark-lucenerdd/blob/master/src/main/resources/reference.conf#L19
Also, if you have any fields that you do not use for linkage, you should remove them from the DataFrame, i.e.
val fields = Array("usefulCol1", "usefulCol2", ...)
val projectedSearchBase = searchBase.select(fields)
and then
val results = index.link(searchBase.rdd, Linker.queryBuilderLinker, topK).filter(_._2.nonEmpty)
Thank you for your response @zouzias
Both dataframes contain only one field. It is a string field containing addresses so they are not that long too.
I can't use linker.method = "collectbroadcast"
because I am using the custom build discussed on #162 . I am using relatively complex queries and they are very hard to build using Strings (and hard to manage overall) . Query builders do a better job here.
Do you have any idea about what could be the source of the OOM error? It seems that executors are allocating too much much memory , but the executors are relatively big and sizes are relatively small.
I tried with even smaller datasets and it is still failing (size > 100K). It works sometimes with bigger datasets , but it is not reliable.