ipex-llm
ipex-llm copied to clipboard
Orca: call spark_df_to_pd_sparkxshards() failed after joining two spark dataframes
When I want to call spark_df_to_pd_sparkxshards()
to transform the spark dataframe
to sparkxshards
after the join()
operation, an error occurs in the function. It shows Must pass schema or at least one RecordBatch
. But I can print the schema of the spark dataframe successfully. Maybe there is something wrong in the function.
Source Code
Error
The schema
is printed successfully
But when I call the spark.dataframe.limit() function, it runs without error
@dding3 Please take a look.
[Executor task launch worker for task 0.0 in stage 6.0 (TID 6)] WARN org.apache.spark.storage.BlockManager - Putting block rdd_49_0 failed due to exception org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main process() File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process serializer.dump_stream(out_iter, outfile) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/bigdl/orca/data/utils.py", line 470, in farrow table = pa.Table.from_batches(batches) File "pyarrow/table.pxi", line 3771, in pyarrow.lib.Table.from_batches ValueError: Must pass schema, or at least one RecordBatch . [Executor task launch worker for task 0.0 in stage 6.0 (TID 6)] WARN org.apache.spark.storage.BlockManager - Block rdd_49_0 could not be removed as it was not found on disk or in memory [Executor task launch worker for task 0.0 in stage 6.0 (TID 6)] ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 6.0 (TID 6) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main process() File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process serializer.dump_stream(out_iter, outfile) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/bigdl/orca/data/utils.py", line 470, in farrow table = pa.Table.from_batches(batches) File "pyarrow/table.pxi", line 3771, in pyarrow.lib.Table.from_batches ValueError: Must pass schema, or at least one RecordBatch
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
[task-result-getter-2] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 6.0 (TID 6) (172.20.239.36 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main process() File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process serializer.dump_stream(out_iter, outfile) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/bigdl/orca/data/utils.py", line 470, in farrow table = pa.Table.from_batches(batches) File "pyarrow/table.pxi", line 3771, in pyarrow.lib.Table.from_batches ValueError: Must pass schema, or at least one RecordBatch
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
[task-result-getter-2] ERROR org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 6.0 failed 1 times; aborting job [Executor task launch worker for task 1.0 in stage 6.0 (TID 7)] WARN org.apache.spark.storage.BlockManager - Putting block rdd_49_1 failed due to exception org.apache.spark.TaskKilledException. [Executor task launch worker for task 1.0 in stage 6.0 (TID 7)] WARN org.apache.spark.storage.BlockManager - Block rdd_49_1 could not be removed as it was not found on disk or in memory [task-result-getter-3] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 1.0 in stage 6.0 (TID 7) (172.20.239.36 executor driver): TaskKilled (Stage cancelled)
Py4JJavaError Traceback (most recent call last)
/tmp/ipykernel_63/1076036859.py in
/tmp/ipykernel_63/1076036859.py in test_merge_shards() 15 data_shard1 = spark_df_to_pd_sparkxshards(df1) 16 data_shard2 = spark_df_to_pd_sparkxshards(df2) ---> 17 merged_shard = merge(data_shard1,data_shard2, on='a') 18 merged_shard_df = merged_shard.to_spark_df() 19 assert len(merged_shard)==2 and merged_shard_df.columns==['a','b','c']
/tmp/ipykernel_63/3913789739.py in merge(left, right, how, on, **kwargs) 24 #merged=merged.limit(merged.count()) 25 #print(merged.schema) ---> 26 merged = spark_df_to_pd_sparkxshards(merged) 27 return merged
~/miniconda3/envs/py372/lib/python3.7/site-packages/bigdl/orca/data/utils.py in spark_df_to_pd_sparkxshards(df, squeeze, index_col, dtype, index_map) 409 pd_rdd = spark_df_to_rdd_pd(df, squeeze, index_col, dtype, index_map) 410 from bigdl.orca.data import SparkXShards --> 411 spark_xshards = SparkXShards(pd_rdd) 412 return spark_xshards 413
~/miniconda3/envs/py372/lib/python3.7/site-packages/bigdl/orca/data/shard.py in init(self, rdd, transient, class_name) 147 self.rdd.cache() 148 if self.eager: --> 149 self.compute() 150 self.type = {} 151 if class_name:
~/miniconda3/envs/py372/lib/python3.7/site-packages/bigdl/orca/data/shard.py in compute(self) 213 214 def compute(self): --> 215 self.rdd.count() 216 return self 217
~/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/rdd.py in count(self) 1233 3 1234 """ -> 1235 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 1236 1237 def stats(self):
~/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/rdd.py in sum(self) 1222 6.0 1223 """ -> 1224 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 1225 1226 def count(self):
~/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/rdd.py in fold(self, zeroValue, op) 1076 # zeroValue provided to each partition is unique from the one provided 1077 # to the final reduce call -> 1078 vals = self.mapPartitions(func).collect() 1079 return reduce(op, vals, zeroValue) 1080
~/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/rdd.py in collect(self) 947 """ 948 with SCCallSiteSync(self.context) as css: --> 949 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 950 return list(_load_from_socket(sock_info, self._jrdd_deserializer)) 951
~/miniconda3/envs/py372/lib/python3.7/site-packages/py4j/java_gateway.py in call(self, *args) 1303 answer = self.gateway_client.send_command(command) 1304 return_value = get_return_value( -> 1305 answer, self.gateway_client, self.target_id, self.name) 1306 1307 for temp_arg in temp_args:
~/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw) 109 def deco(*a, **kw): 110 try: --> 111 return f(*a, **kw) 112 except py4j.protocol.Py4JJavaError as e: 113 converted = convert_exception(e.java_exception)
~/miniconda3/envs/py372/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6) (172.20.239.36 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main process() File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process serializer.dump_stream(out_iter, outfile) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/bigdl/orca/data/utils.py", line 470, in farrow table = pa.Table.from_batches(batches) File "pyarrow/table.pxi", line 3771, in pyarrow.lib.Table.from_batches ValueError: Must pass schema, or at least one RecordBatch
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main process() File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process serializer.dump_stream(out_iter, outfile) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/root/miniconda3/envs/py372/lib/python3.7/site-packages/bigdl/orca/data/utils.py", line 470, in farrow table = pa.Table.from_batches(batches) File "pyarrow/table.pxi", line 3771, in pyarrow.lib.Table.from_batches ValueError: Must pass schema, or at least one RecordBatch
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
The full error is here.
When I try to create spark.dataframe from rdd, the error Must pass schema or at least one RecordBatch
still exists in the function spark_df_to_pd_xshards
.
Additionally, I tried to call read_csv()
directly to generate a sparkXShards, but the results showed differently according to the size of the input dataset. It runs without error when I read the file containing more than 310000 rows. However the error Must pass schema or at least one RecordBatch
occurs again if the file contains less than about 310000 rows.
- about 970000 rows, no error occurs:
- about 10000 rows, error occurs:
The exception happend as XShards of dataframe doesn't expect empty dataframe for each partition. While in above code, after spark dataframe join
operatition, the joined spark df(merged
in the code) partitions will increase to 200 and will cause some partitions be empty and cause the exception.
From XShard side, I think we can add logic to throw exceptions if detect empty partitions.
From above code, I think we need coalesc the merged spark dataframe to ensure no partition is empty. I updated one line code and it works.
merged = left_df.join(right_df, on=on, how=how).coalesce(left_df.rdd.getNumPartitions())
BTW, we need support merge
operation in Shards, could you add the implementation of merge
to https://github.com/intel-analytics/BigDL/blob/main/python/orca/src/bigdl/orca/data/shard.py after it's done?
BTW, we need support
merge
operation in Shards, could you add the implementation ofmerge
to https://github.com/intel-analytics/BigDL/blob/main/python/orca/src/bigdl/orca/data/shard.py after it's done?
https://github.com/intel-analytics/BigDL/pull/5820 already have a PR
The exception happend as XShards of dataframe doesn't expect empty dataframe for each partition. While in above code, after spark dataframe
join
operatition, the joined spark df(merged
in the code) partitions will increase to 200 and will cause some partitions be empty and cause the exception.From XShard side, I think we can add logic to throw exceptions if detect empty partitions.
From above code, I think we need coalesc the merged spark dataframe to ensure no partition is empty. I updated one line code and it works.
merged = left_df.join(right_df, on=on, how=how).coalesce(left_df.rdd.getNumPartitions())
I don't coalesce will ensure every partition is non-empty?
I think with default spark settings, without coalesce, we cannot guarantee each partition is non-empty.
I think with default spark settings, without coalesce, we cannot guarantee each partition is non-empty.
But even with coalesce, will it guarantee each partition is non-empty?
I think with default spark settings, without coalesce, we cannot guarantee each partition is non-empty.
But even with coalesce, will it guarantee each partition is non-empty?
I think if there is no empty partition in the left_df, there will be no empty partition after coalesce? Also, we have added logic to check if there is empty partition in the code.