ipex-llm icon indicating copy to clipboard operation
ipex-llm copied to clipboard

Orca: call spark_df_to_pd_sparkxshards() failed after joining two spark dataframes

Open zpeng1898 opened this issue 1 year ago • 8 comments

When I want to call spark_df_to_pd_sparkxshards() to transform the spark dataframe to sparkxshards after the join() operation, an error occurs in the function. It shows Must pass schema or at least one RecordBatch. But I can print the schema of the spark dataframe successfully. Maybe there is something wrong in the function.

Source Code image image

Error image image image

The schema is printed successfully image

But when I call the spark.dataframe.limit() function, it runs without error image

zpeng1898 avatar Sep 21 '22 02:09 zpeng1898

@dding3 Please take a look.

hkvision avatar Sep 21 '22 02:09 hkvision

[Executor task launch worker for task 0.0 in stage 6.0 (TID 6)] WARN org.apache.spark.storage.BlockManager - Putting block rdd_49_0 failed due to exception org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main process() File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process serializer.dump_stream(out_iter, outfile) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/bigdl/orca/data/utils.py", line 470, in farrow table = pa.Table.from_batches(batches) File "pyarrow/table.pxi", line 3771, in pyarrow.lib.Table.from_batches ValueError: Must pass schema, or at least one RecordBatch . [Executor task launch worker for task 0.0 in stage 6.0 (TID 6)] WARN org.apache.spark.storage.BlockManager - Block rdd_49_0 could not be removed as it was not found on disk or in memory [Executor task launch worker for task 0.0 in stage 6.0 (TID 6)] ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 6.0 (TID 6) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main process() File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process serializer.dump_stream(out_iter, outfile) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/bigdl/orca/data/utils.py", line 470, in farrow table = pa.Table.from_batches(batches) File "pyarrow/table.pxi", line 3771, in pyarrow.lib.Table.from_batches ValueError: Must pass schema, or at least one RecordBatch

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

[task-result-getter-2] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 6.0 (TID 6) (172.20.239.36 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main process() File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process serializer.dump_stream(out_iter, outfile) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/bigdl/orca/data/utils.py", line 470, in farrow table = pa.Table.from_batches(batches) File "pyarrow/table.pxi", line 3771, in pyarrow.lib.Table.from_batches ValueError: Must pass schema, or at least one RecordBatch

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

[task-result-getter-2] ERROR org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 6.0 failed 1 times; aborting job [Executor task launch worker for task 1.0 in stage 6.0 (TID 7)] WARN org.apache.spark.storage.BlockManager - Putting block rdd_49_1 failed due to exception org.apache.spark.TaskKilledException. [Executor task launch worker for task 1.0 in stage 6.0 (TID 7)] WARN org.apache.spark.storage.BlockManager - Block rdd_49_1 could not be removed as it was not found on disk or in memory [task-result-getter-3] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 1.0 in stage 6.0 (TID 7) (172.20.239.36 executor driver): TaskKilled (Stage cancelled)

Py4JJavaError Traceback (most recent call last) /tmp/ipykernel_63/1076036859.py in 18 merged_shard_df = merged_shard.to_spark_df() 19 assert len(merged_shard)==2 and merged_shard_df.columns==['a','b','c'] ---> 20 test_merge_shards()

/tmp/ipykernel_63/1076036859.py in test_merge_shards() 15 data_shard1 = spark_df_to_pd_sparkxshards(df1) 16 data_shard2 = spark_df_to_pd_sparkxshards(df2) ---> 17 merged_shard = merge(data_shard1,data_shard2, on='a') 18 merged_shard_df = merged_shard.to_spark_df() 19 assert len(merged_shard)==2 and merged_shard_df.columns==['a','b','c']

/tmp/ipykernel_63/3913789739.py in merge(left, right, how, on, **kwargs) 24 #merged=merged.limit(merged.count()) 25 #print(merged.schema) ---> 26 merged = spark_df_to_pd_sparkxshards(merged) 27 return merged

~/miniconda3/envs/py372/lib/python3.7/site-packages/bigdl/orca/data/utils.py in spark_df_to_pd_sparkxshards(df, squeeze, index_col, dtype, index_map) 409 pd_rdd = spark_df_to_rdd_pd(df, squeeze, index_col, dtype, index_map) 410 from bigdl.orca.data import SparkXShards --> 411 spark_xshards = SparkXShards(pd_rdd) 412 return spark_xshards 413

~/miniconda3/envs/py372/lib/python3.7/site-packages/bigdl/orca/data/shard.py in init(self, rdd, transient, class_name) 147 self.rdd.cache() 148 if self.eager: --> 149 self.compute() 150 self.type = {} 151 if class_name:

~/miniconda3/envs/py372/lib/python3.7/site-packages/bigdl/orca/data/shard.py in compute(self) 213 214 def compute(self): --> 215 self.rdd.count() 216 return self 217

~/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/rdd.py in count(self) 1233 3 1234 """ -> 1235 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 1236 1237 def stats(self):

~/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/rdd.py in sum(self) 1222 6.0 1223 """ -> 1224 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 1225 1226 def count(self):

~/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/rdd.py in fold(self, zeroValue, op) 1076 # zeroValue provided to each partition is unique from the one provided 1077 # to the final reduce call -> 1078 vals = self.mapPartitions(func).collect() 1079 return reduce(op, vals, zeroValue) 1080

~/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/rdd.py in collect(self) 947 """ 948 with SCCallSiteSync(self.context) as css: --> 949 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 950 return list(_load_from_socket(sock_info, self._jrdd_deserializer)) 951

~/miniconda3/envs/py372/lib/python3.7/site-packages/py4j/java_gateway.py in call(self, *args) 1303 answer = self.gateway_client.send_command(command) 1304 return_value = get_return_value( -> 1305 answer, self.gateway_client, self.target_id, self.name) 1306 1307 for temp_arg in temp_args:

~/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw) 109 def deco(*a, **kw): 110 try: --> 111 return f(*a, **kw) 112 except py4j.protocol.Py4JJavaError as e: 113 converted = convert_exception(e.java_exception)

~/miniconda3/envs/py372/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6) (172.20.239.36 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main process() File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process serializer.dump_stream(out_iter, outfile) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/bigdl/orca/data/utils.py", line 470, in farrow table = pa.Table.from_batches(batches) File "pyarrow/table.pxi", line 3771, in pyarrow.lib.Table.from_batches ValueError: Must pass schema, or at least one RecordBatch

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main process() File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process serializer.dump_stream(out_iter, outfile) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/bigdl/orca/data/utils.py", line 470, in farrow table = pa.Table.from_batches(batches) File "pyarrow/table.pxi", line 3771, in pyarrow.lib.Table.from_batches ValueError: Must pass schema, or at least one RecordBatch

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

zpeng1898 avatar Sep 21 '22 02:09 zpeng1898

The full error is here.

zpeng1898 avatar Sep 21 '22 02:09 zpeng1898

When I try to create spark.dataframe from rdd, the error Must pass schema or at least one RecordBatch still exists in the function spark_df_to_pd_xshards.

image

zpeng1898 avatar Sep 21 '22 03:09 zpeng1898

Additionally, I tried to call read_csv() directly to generate a sparkXShards, but the results showed differently according to the size of the input dataset. It runs without error when I read the file containing more than 310000 rows. However the error Must pass schema or at least one RecordBatch occurs again if the file contains less than about 310000 rows.

  1. about 970000 rows, no error occurs:

image image

  1. about 10000 rows, error occurs: image image

zpeng1898 avatar Sep 22 '22 03:09 zpeng1898

The exception happend as XShards of dataframe doesn't expect empty dataframe for each partition. While in above code, after spark dataframe join operatition, the joined spark df(merged in the code) partitions will increase to 200 and will cause some partitions be empty and cause the exception.

From XShard side, I think we can add logic to throw exceptions if detect empty partitions.

From above code, I think we need coalesc the merged spark dataframe to ensure no partition is empty. I updated one line code and it works. merged = left_df.join(right_df, on=on, how=how).coalesce(left_df.rdd.getNumPartitions())

dding3 avatar Sep 22 '22 17:09 dding3

BTW, we need support merge operation in Shards, could you add the implementation of merge to https://github.com/intel-analytics/BigDL/blob/main/python/orca/src/bigdl/orca/data/shard.py after it's done?

dding3 avatar Sep 22 '22 17:09 dding3

BTW, we need support merge operation in Shards, could you add the implementation of merge to https://github.com/intel-analytics/BigDL/blob/main/python/orca/src/bigdl/orca/data/shard.py after it's done?

https://github.com/intel-analytics/BigDL/pull/5820 already have a PR

hkvision avatar Sep 23 '22 03:09 hkvision

The exception happend as XShards of dataframe doesn't expect empty dataframe for each partition. While in above code, after spark dataframe join operatition, the joined spark df(merged in the code) partitions will increase to 200 and will cause some partitions be empty and cause the exception.

From XShard side, I think we can add logic to throw exceptions if detect empty partitions.

From above code, I think we need coalesc the merged spark dataframe to ensure no partition is empty. I updated one line code and it works. merged = left_df.join(right_df, on=on, how=how).coalesce(left_df.rdd.getNumPartitions())

I don't coalesce will ensure every partition is non-empty?

hkvision avatar Sep 23 '22 12:09 hkvision

I think with default spark settings, without coalesce, we cannot guarantee each partition is non-empty.

dding3 avatar Sep 23 '22 21:09 dding3

I think with default spark settings, without coalesce, we cannot guarantee each partition is non-empty.

But even with coalesce, will it guarantee each partition is non-empty?

hkvision avatar Sep 26 '22 01:09 hkvision

I think with default spark settings, without coalesce, we cannot guarantee each partition is non-empty.

But even with coalesce, will it guarantee each partition is non-empty?

I think if there is no empty partition in the left_df, there will be no empty partition after coalesce? Also, we have added logic to check if there is empty partition in the code.

dding3 avatar Sep 27 '22 17:09 dding3