incubator-uniffle
incubator-uniffle copied to clipboard
[Improvement] Support Empty assignment to Shuffle Server
When I tested hudi, I got an error. this is spark driver log, ERROR: Empty assignment to Shuffle Server
52278 [dag-scheduler-event-loop] INFO org.apache.spark.shuffle.RssShuffleManager - Generate application id used in rss: spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844
52281 [dag-scheduler-event-loop] ERROR com.tencent.rss.client.impl.ShuffleWriteClientImpl - Empty assignment to Shuffle Server
52282 [dag-scheduler-event-loop] ERROR com.tencent.rss.client.impl.ShuffleWriteClientImpl - Error happened when getShuffleAssignments with appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], numMaps[0], partitionNumPerRange[1] to coordinator
52283 [dag-scheduler-event-loop] WARN org.apache.spark.scheduler.DAGScheduler - Creating new stage failed due to exception - job: 5
com.tencent.rss.common.exception.RssException: Error happened when getShuffleAssignments with appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], numMaps[0], partitionNumPerRange[1] to coordinator
at com.tencent.rss.client.impl.ShuffleWriteClientImpl.throwExceptionIfNecessary(ShuffleWriteClientImpl.java:440)
at com.tencent.rss.client.impl.ShuffleWriteClientImpl.getShuffleAssignments(ShuffleWriteClientImpl.java:291)
at org.apache.spark.shuffle.RssShuffleManager.registerShuffle(RssShuffleManager.java:247)
at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:97)
at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
at org.apache.spark.rdd.RDD.$anonfun$dependencies$2(RDD.scala:264)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.dependencies(RDD.scala:260)
at org.apache.spark.scheduler.DAGScheduler.getShuffleDependenciesAndResourceProfiles(DAGScheduler.scala:634)
at org.apache.spark.scheduler.DAGScheduler.getMissingAncestorShuffleDependencies(DAGScheduler.scala:597)
at org.apache.spark.scheduler.DAGScheduler.getOrCreateShuffleMapStage(DAGScheduler.scala:394)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$getOrCreateParentStages$1(DAGScheduler.scala:580)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:48)
at scala.collection.SetLike.map(SetLike.scala:104)
at scala.collection.SetLike.map$(SetLike.scala:104)
at scala.collection.mutable.AbstractSet.map(Set.scala:48)
at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:579)
at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:564)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1115)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2396)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2388)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2377)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
52287 [main] INFO org.apache.spark.scheduler.DAGScheduler - Job 5 failed: countByKey at BaseSparkCommitActionExecutor.java:191, took 0.076660 s
this is coordinator log , request partitionNum is 0
[INFO] 2022-08-11 11:29:49,335 Grpc-301 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], partitionNum[0], partitionNumPerRange[1], replica[1]
full log
[INFO] 2022-08-11 11:29:26,946 Grpc-267 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[0], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:26,946 Grpc-267 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:26,946 Grpc-267 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[0] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:27,033 Grpc-270 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[1], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:27,033 Grpc-270 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:27,034 Grpc-270 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[1] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:37,957 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 2 applications
[INFO] 2022-08-11 11:29:43,047 Grpc-283 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[2], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:43,048 Grpc-283 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:43,048 Grpc-283 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[2] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,165 Grpc-293 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[3], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,166 Grpc-293 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:49,166 Grpc-293 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[3] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,247 Grpc-298 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[4], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,247 Grpc-298 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:49,247 Grpc-298 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[4] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,267 Grpc-297 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[5], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,267 Grpc-297 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:49,268 Grpc-297 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[5] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,335 Grpc-301 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], partitionNum[0], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,335 Grpc-301 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:30:07,957 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 2 applications
[INFO] 2022-08-11 11:30:07,957 ApplicationManager-0 ApplicationManager statusCheck - Remove expired application:spark-d7f3e51ca713472e88568db90c91bdea1660187027133
Environment:
uniffle: firestorm 0.4.1
spark: 3.1.2
hudi: 0.10.0
k8s: v1.21.3
When a shuffle have 0 reduce partition, is it a meaningful shuffle? What's situation? Is it a map only application?
When I look at the RSS Client code, I see that no matter how many partitionNum are requested, an exception is thrown whenever the coordinator returns empty. https://github.com/apache/incubator-uniffle/blob/master/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
// transform [startPartition, endPartition] -> [server1, server2] to
// {partition1 -> [server1, server2], partition2 - > [server1, server2]}
@VisibleForTesting
public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers(
GetShuffleAssignmentsResponse response) {
Map<Integer, List<ShuffleServerInfo>> partitionToServers = Maps.newHashMap();
List<PartitionRangeAssignment> assigns = response.getAssignmentsList();
for (PartitionRangeAssignment partitionRangeAssignment : assigns) {
final int startPartition = partitionRangeAssignment.getStartPartition();
final int endPartition = partitionRangeAssignment.getEndPartition();
final List<ShuffleServerInfo> shuffleServerInfos = partitionRangeAssignment
.getServerList()
.stream()
.map(ss -> new ShuffleServerInfo(ss.getId(), ss.getIp(), ss.getPort()))
.collect(Collectors.toList());
for (int i = startPartition; i <= endPartition; i++) {
partitionToServers.put(i, shuffleServerInfos);
}
}
if (partitionToServers.isEmpty()) {
throw new RssException("Empty assignment to Shuffle Server");
}
return partitionToServers;
}
Is it possible to verify the partitionNum of the request and the partitionNum of the response, or other operations?
// transform [startPartition, endPartition] -> [server1, server2] to
// {partition1 -> [server1, server2], partition2 - > [server1, server2]}
@VisibleForTesting
public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers(RssGetShuffleAssignmentsRequest request,
GetShuffleAssignmentsResponse response) {
Map<Integer, List<ShuffleServerInfo>> partitionToServers = Maps.newHashMap();
List<PartitionRangeAssignment> assigns = response.getAssignmentsList();
for (PartitionRangeAssignment partitionRangeAssignment : assigns) {
final int startPartition = partitionRangeAssignment.getStartPartition();
final int endPartition = partitionRangeAssignment.getEndPartition();
final List<ShuffleServerInfo> shuffleServerInfos = partitionRangeAssignment
.getServerList()
.stream()
.map(ss -> new ShuffleServerInfo(ss.getId(), ss.getIp(), ss.getPort()))
.collect(Collectors.toList());
for (int i = startPartition; i <= endPartition; i++) {
partitionToServers.put(i, shuffleServerInfos);
}
}
if (request.getPartitionNum() == 0 && partitionToServers.isEmpty()) {
LOG.warn("dependency partitionNum is 0");
}else if(partitionToServers.isEmpty()){
throw new RssException("Empty assignment to Shuffle Server");
}
return partitionToServers;
}
When a shuffle have 0 reduce partition, is it a meaningful shuffle? What's situation? Is it a map only application?
emmm, Now I don't know what HUDi does, this is my HUDI test code and UI . The error occurred during the INSERT phase
val conf = new SparkConf()
conf.set("spark.sql.extensions","org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
conf.set("spark.shuffle.manager","org.apache.spark.shuffle.RssShuffleManager")
conf.set("spark.rss.coordinator.quorum","xx.xx.xx.xx:19999")
conf.set("spark.rss.storage.type","MEMORY_LOCALFILE")
val session: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().appName("di_hudi").getOrCreate()
session.sql(
"""
|create table if not exists hudi_tb0808_1 (
| id int,
| name string,
| work string
|) using hudi
|tblproperties (
| type = 'cow',
| primaryKey = 'id',
| preCombineField = 'work'
|)
|location '/hudi/tmp/xxx1'
|""".stripMargin)
session.sql(
s"""
|insert into hudi_tb0808_1 values(${random},'tmp1','hudi_tb0508_1')
|""".stripMargin)
session.stop()
Could you click your job 4 and give me your details of job 4?
Could you click your
job 4and give me your details ofjob 4?
@rhh777 from the log Error happened when getShuffleAssignments with appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], numMaps[0], partitionNumPerRange[1] to coordinator, it's strange that numMaps = 0.
And unexpected zero is come from dependency.partitioner().numPartitions(), is it possible to check why zero is returned?
Is you config option spark.default.parallelism zero?
Is you config option
spark.default.parallelismzero?
There is no set
@rhh777 from the log
Error happened when getShuffleAssignments with appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], numMaps[0], partitionNumPerRange[1] to coordinator, it's strange that numMaps = 0. And unexpected zero is come fromdependency.partitioner().numPartitions(), is it possible to check why zero is returned?
ok, I'll try to debug it
@rhh777 from the log
Error happened when getShuffleAssignments with appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], numMaps[0], partitionNumPerRange[1] to coordinator, it's strange that numMaps = 0. And unexpected zero is come fromdependency.partitioner().numPartitions(), is it possible to check why zero is returned?
In the seemingly and SparkHoodieBloomIndexHelper findMatchingFilesForRecordKeys method, I continue to debug
The previous HUDI version was incorrectly described, it should be 0.10.0
Could we ask why this partitioner has no partition in Hudi community? It's tricky that partition has no partition?
Sorry for my busy work now, I will further study HUDI when I am free
When the RDD is Empty, the shuffle is triggered and the Empty assignment to Shuffle Server
val conf = new SparkConf()
conf.set("spark.shuffle.manager","org.apache.spark.shuffle.RssShuffleManager")
conf.set("spark.rss.coordinator.quorum","localhost:19999")
conf.set("spark.rss.storage.type","MEMORY_LOCALFILE")
val session: SparkSession = SparkSession.builder().config(conf).
master("local").appName("rss_empty_assignment").getOrCreate()
val rdd: RDD[Row] = session.emptyDataFrame.rdd
val inputRDD: RDD[(String, Int)] = rdd.map((row: Row) => {
("rss_empty_assignment", 1)
})
inputRDD.countByKey().foreach(println)
session.stop()
I got it. According to my understanding, this program should print nothing. Actually we shouldn't request the coordinator when we don't have any partition, we could return the RssShuffleHandle directly. You can see https://github.com/apache/incubator-uniffle/blob/4a3d2be36795df2eaf2edbe75ecc5816bf7eb87a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java#L204 for more details. Would you like raise a pr to fix this problem?
I got it. According to my understanding, this program should print nothing. Actually we shouldn't request the coordinator when we don't have any partition, we could return the RssShuffleHandle directly. You can see
https://github.com/apache/incubator-uniffle/blob/4a3d2be36795df2eaf2edbe75ecc5816bf7eb87a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java#L204
for more details. Would you like raise a pr to fix this problem?
ok