spark-lucenerdd icon indicating copy to clipboard operation
spark-lucenerdd copied to clipboard

Help debugging a OOM issue when the search population increases

Open yeikel opened this issue 4 years ago • 3 comments

Describe the bug

I have a LuceneRDD that I construct using the following syntax :

val index = LuceneRDD(data).persist(StorageLevel.MEMORY_AND_DISK)

For which I run against another dataset using the following code :

val results = index.link(searchBase.rdd, Linker.queryBuilderLinker, topK).filter(_._2.nonEmpty)

My distribution is the following :

index : 90M records , 300 partitions searchBase : 2M records , 10 partitions Topk : 1000

My cluster configuration is :

Executor-cores 2 
Executor-memory 19g
Driver-memory 25g 

And my linker method is

lucenerdd {
  linker.method = "cartesian"
  store.mode = "disk"
}

Code that is failing :

flatMap at LuceneRDD.scala:222

And the exception is :

Job aborted due to stage failure: Task 9525 in stage 11.0 failed 4 times, most recent failure: Lost task 9525.3 in stage 11.0 (TID 12771, executor 89): ExecutorLostFailure (executor 89 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 21.0 GB of 21 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Failed task :

image

Increasing the number of partitions for searchBase or index did not make any difference. The only time that the job succeeds is when I reduce my search base to 1M. Reducing topk did not make any difference as well

My cluster is big enough to be able to cache the index base (I am able to see that in the storage page) , so I am having a hard time understanding the source of the OOM. :

image

image

Thanks for your help!

yeikel avatar Feb 20 '20 23:02 yeikel

@zouzias Do you have any idea? I keep getting this error and changing the partitioning does not help

yeikel avatar Feb 24 '20 14:02 yeikel

Can you try with linker method linker.method = "collectbroadcast"?

I think you should be able to broadcast the search base to all executors since they are only 2M.

See: https://github.com/zouzias/spark-lucenerdd/blob/master/src/main/resources/reference.conf#L19

Also, if you have any fields that you do not use for linkage, you should remove them from the DataFrame, i.e.

val fields = Array("usefulCol1", "usefulCol2", ...)
val projectedSearchBase = searchBase.select(fields)

and then

val results = index.link(searchBase.rdd, Linker.queryBuilderLinker, topK).filter(_._2.nonEmpty) 

zouzias avatar Feb 26 '20 11:02 zouzias

Thank you for your response @zouzias

Both dataframes contain only one field. It is a string field containing addresses so they are not that long too.

I can't use linker.method = "collectbroadcast" because I am using the custom build discussed on #162 . I am using relatively complex queries and they are very hard to build using Strings (and hard to manage overall) . Query builders do a better job here.

Do you have any idea about what could be the source of the OOM error? It seems that executors are allocating too much much memory , but the executors are relatively big and sizes are relatively small.

I tried with even smaller datasets and it is still failing (size > 100K). It works sometimes with bigger datasets , but it is not reliable.

yeikel avatar Feb 26 '20 14:02 yeikel