ipex-llm
ipex-llm copied to clipboard
Fix duplicate repartition in tf2 estimator spark backend
Description
In fit, if input is dataframe, we will do repartition if partition < num workers when converting to SparkXShards. After converting to SparkXShards, the data should have num_partitions >= num_workers for training (if there is no empty partition in original user's data, which is assumed). Since in SparkXShards, each partition is combined to a single dict, if you call repartition once more on SparkXShards, it will result in empty partitions again.
>>> rdd = sc.parallelize([1, 2, 3, 4], numSlices=4)
>>> rdd.glom().collect()
[[1], [2], [3], [4]]
>>> rdd2 = rdd.repartition(4)
>>> rdd2.glom().collect()
[[4, 3], [2], [], [1]]
# 1, 2, 3, 4 correspond to the entire dict of data in each SparkXShards partition
Why do we repartition previously? @jenniew
Why do we repartition previously? @jenniew
The previous repartition is in maybe_dataframe_to_xshards()
which was originally created for ray estimator. It avoids empty partition for worker when partitions < num of workers. TF2Estimator on Pyspark reused this function to change dataframe to xshards.
I think we may not remove the repartition when training/evaluation.
We do repartition to let the training/evaluation only on number of workers.
If there are thousands of partitions, we don't want to train on thousands of tf workers.
So I think the fix of situation that partition < num workers may be like:
if data.rdd.getNumPartitions() > self.num_workers:
res = data.rdd.repartition(self.num_workers).barrier() \
.mapPartitions(
lambda iter: transform_func(iter, init_params, params)).collect()
else:
res = data.rdd.barrier() \
.mapPartitions(
lambda iter: transform_func(iter, init_params, params)).collect()
Why do we must have the number of partitions equal to the number of workers? Repartition is expensive, if the number of partitions is already larger than the number of workers, why can't each worker just take more than 1 partition like the ray estimator, in mapPartitions one worker finish one partition and takes another partition?
Why do we must have the number of partitions equal to the number of workers? Repartition is expensive, if the number of partitions is already larger than the number of workers, why can't each worker just take more than 1 partition like the ray estimator, in mapPartitions one worker finish one partition and takes another partition?
I think we can do repartitioning without shuffling?
Why do we must have the number of partitions equal to the number of workers? Repartition is expensive, if the number of partitions is already larger than the number of workers, why can't each worker just take more than 1 partition like the ray estimator, in mapPartitions one worker finish one partition and takes another partition?
I think we can do repartitioning without shuffling?
After converting to XShards, the number of partitions must be >= the number of workers. So we can directly call coalesce when num partitions > num workers? If the original data is relatively evenly partitioned, each worker will get around the same amount of data?
Why do we must have the number of partitions equal to the number of workers? Repartition is expensive, if the number of partitions is already larger than the number of workers, why can't each worker just take more than 1 partition like the ray estimator, in mapPartitions one worker finish one partition and takes another partition?
I think we can do repartitioning without shuffling?
After converting to XShards, the number of partitions must be >= the number of workers. So we can directly call coalesce when num partitions > num workers? If the original data is relatively evenly partitioned, each worker will get around the same amount of data?
Using coalesce is a better way.
Will coalesce result in unbalanced partitions? e.g. node1 has 9 partitions and node2 has 1 partition, after coalecse to 2 partitions, will each new partition has 5 smaller partitions or one has 9 and one has 1? @jason-dai @jenniew
The issue seems to be caused by a duplicated repartition when input data partition < num_workers, shall we also add test case for that?
Can the issue detected by a similar test case in ray: test_partition_num_less_than_workers with workers_per_node=4
?
After changing to coalesce will throw the following error:
> format(target_id, ".", name), value)
E py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
E : org.apache.spark.scheduler.BarrierJobUnsupportedRDDChainException: [SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following pattern of RDD chain within a barrier stage:
E 1. Ancestor RDDs that have different number of partitions from the resulting RDD (eg. union()/coalesce()/first()/take()/PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head (scala) or barrierRdd.collect()[0] (python).
E 2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2)).
E at org.apache.spark.scheduler.DAGScheduler.checkBarrierStageWithRDDChainPattern(DAGScheduler.scala:372)
E at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:448)
E at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:963)
E at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2069)
E at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
E at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
E at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
E at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
E at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
E at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
E at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
E at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
E at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
E at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
E at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
E at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
E at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
E at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
E at sun.reflect.GeneratedMethodAccessor80.invoke(Unknown Source)
E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E at java.lang.reflect.Method.invoke(Method.java:498)
E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E at py4j.Gateway.invoke(Gateway.java:282)
E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E at py4j.commands.CallCommand.execute(CallCommand.java:79)
E at py4j.GatewayConnection.run(GatewayConnection.java:238)
E at java.lang.Thread.run(Thread.java:748)
@jason-dai @jenniew barrier can't be performed on the rdd that comes from coalesce... So we still keep the repartition if rdd.getNumPartitions > num workers?
@jason-dai @jenniew barrier can't be performed on the rdd that comes from coalesce... So we still keep the repartition if rdd.getNumPartitions > num workers?
Can we do repartitioning without shuffle?
take
Seems no? To reduce num partitions without shuffle, use coalesce, which can't be combined with barrier. Unsupported to increase partitions without shuffle: https://stackoverflow.com/questions/71070709/increase-the-number-of-partitions-without-repartition-on-hadoop
Will coalesce result in unbalanced partitions? e.g. node1 has 9 partitions and node2 has 1 partition, after coalecse to 2 partitions, will each new partition has 5 smaller partitions or one has 9 and one has 1? @jason-dai @jenniew
Also, even if coalesce can work with barrier, it will result in unbalanced partitions, which will have issues in the training.
I suppose currently under this implementation we still need to do repartition.
Issue conclusion:
- The implementation of spark backend requires the number of data partitions equal to the number of workers (a limitation from the original design, but not possible to change it now)
- After converting to SparkXShards, each partition will have only one element (if shard_size is not specified), and if we do repartition on SparkXShards, very likely will result in empty partitions.
- Move the repartition to Spark DataFrame before converting to SparkXShards, which is the best solution at the moment.
http://10.112.231.51:18888/job/ZOO-PR-BigDL-Python-Spark-3.1-py37-Horovod/1330/ related unit tests all passed.
http://10.112.231.51:18888/job/BigDL-Orca-PR-Validation/933/ some failures are irrelevant