raydp icon indicating copy to clipboard operation
raydp copied to clipboard

Threads hang on getting from object store when calling `to_spark`

Open Deegue opened this issue 1 year ago • 0 comments

We initialize a spark instance by raydp.init_spark('benchmark', 1, 5, '30G', configs={"spark.default.parallelism": 50}), Then save the dataframe to object store:

    blocks, _ = raydp.spark.dataset._save_spark_df_to_object_store(df, False)
    ds = ray.data.from_arrow_refs(blocks)

Problem occurs when we invoke ds.to_spark(spark), it will hang on getting object from object store:

"Executor task launch worker for task 27.0 in stage 6.0 (TID 179)" #60 daemon prio=5 os_prio=0 tid=0x00007fe38020b800 nid=0x5057 runnable [0x00007ff6563f8000]
   java.lang.Thread.State: RUNNABLE
        at io.ray.runtime.object.NativeObjectStore.nativeGet(Native Method)
        at io.ray.runtime.object.NativeObjectStore.getRaw(NativeObjectStore.java:53)
        at io.ray.runtime.object.ObjectStore.get(ObjectStore.java:131)
        at io.ray.runtime.AbstractRayRuntime.get(AbstractRayRuntime.java:144)
        at io.ray.runtime.AbstractRayRuntime.get(AbstractRayRuntime.java:125)
        at io.ray.runtime.AbstractRayRuntime.get(AbstractRayRuntime.java:120)
        at io.ray.api.Ray.get(Ray.java:98)
        at io.ray.runtime.object.ObjectRefImpl.get(ObjectRefImpl.java:77)
        - locked <0x0000000739b0c2d0> (a io.ray.runtime.object.ObjectRefImpl)
        at org.apache.spark.sql.raydp.ObjectStoreReader$.getBatchesFromStream(ObjectStoreReader.scala:108)
        at org.apache.spark.rdd.RayDatasetRDD.compute(RayDatasetRDD.scala:61)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.executor.Executor$TaskRunner$$Lambda$514/236884660.apply(Unknown Source)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

However, when we set spark.default.parallelism to a lower value like 1, the problem gone.

Deegue avatar May 30 '23 05:05 Deegue