raydp
raydp copied to clipboard
Threads hang on getting from object store when calling `to_spark`
We initialize a spark instance by raydp.init_spark('benchmark', 1, 5, '30G', configs={"spark.default.parallelism": 50})
,
Then save the dataframe to object store:
blocks, _ = raydp.spark.dataset._save_spark_df_to_object_store(df, False)
ds = ray.data.from_arrow_refs(blocks)
Problem occurs when we invoke ds.to_spark(spark)
, it will hang on getting object from object store:
"Executor task launch worker for task 27.0 in stage 6.0 (TID 179)" #60 daemon prio=5 os_prio=0 tid=0x00007fe38020b800 nid=0x5057 runnable [0x00007ff6563f8000]
java.lang.Thread.State: RUNNABLE
at io.ray.runtime.object.NativeObjectStore.nativeGet(Native Method)
at io.ray.runtime.object.NativeObjectStore.getRaw(NativeObjectStore.java:53)
at io.ray.runtime.object.ObjectStore.get(ObjectStore.java:131)
at io.ray.runtime.AbstractRayRuntime.get(AbstractRayRuntime.java:144)
at io.ray.runtime.AbstractRayRuntime.get(AbstractRayRuntime.java:125)
at io.ray.runtime.AbstractRayRuntime.get(AbstractRayRuntime.java:120)
at io.ray.api.Ray.get(Ray.java:98)
at io.ray.runtime.object.ObjectRefImpl.get(ObjectRefImpl.java:77)
- locked <0x0000000739b0c2d0> (a io.ray.runtime.object.ObjectRefImpl)
at org.apache.spark.sql.raydp.ObjectStoreReader$.getBatchesFromStream(ObjectStoreReader.scala:108)
at org.apache.spark.rdd.RayDatasetRDD.compute(RayDatasetRDD.scala:61)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$514/236884660.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
However, when we set spark.default.parallelism
to a lower value like 1, the problem gone.