incubator-uniffle icon indicating copy to clipboard operation
incubator-uniffle copied to clipboard

[Improvement] Support Empty assignment to Shuffle Server

Open rhh777 opened this issue 3 years ago • 13 comments

When I tested hudi, I got an error. this is spark driver log, ERROR: Empty assignment to Shuffle Server

52278 [dag-scheduler-event-loop] INFO  org.apache.spark.shuffle.RssShuffleManager  - Generate application id used in rss: spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844
52281 [dag-scheduler-event-loop] ERROR com.tencent.rss.client.impl.ShuffleWriteClientImpl  - Empty assignment to Shuffle Server
52282 [dag-scheduler-event-loop] ERROR com.tencent.rss.client.impl.ShuffleWriteClientImpl  - Error happened when getShuffleAssignments with appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], numMaps[0], partitionNumPerRange[1] to coordinator
52283 [dag-scheduler-event-loop] WARN  org.apache.spark.scheduler.DAGScheduler  - Creating new stage failed due to exception - job: 5
com.tencent.rss.common.exception.RssException: Error happened when getShuffleAssignments with appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], numMaps[0], partitionNumPerRange[1] to coordinator
        at com.tencent.rss.client.impl.ShuffleWriteClientImpl.throwExceptionIfNecessary(ShuffleWriteClientImpl.java:440)
        at com.tencent.rss.client.impl.ShuffleWriteClientImpl.getShuffleAssignments(ShuffleWriteClientImpl.java:291)
        at org.apache.spark.shuffle.RssShuffleManager.registerShuffle(RssShuffleManager.java:247)
        at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:97)
        at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
        at org.apache.spark.rdd.RDD.$anonfun$dependencies$2(RDD.scala:264)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.dependencies(RDD.scala:260)
        at org.apache.spark.scheduler.DAGScheduler.getShuffleDependenciesAndResourceProfiles(DAGScheduler.scala:634)
        at org.apache.spark.scheduler.DAGScheduler.getMissingAncestorShuffleDependencies(DAGScheduler.scala:597)
        at org.apache.spark.scheduler.DAGScheduler.getOrCreateShuffleMapStage(DAGScheduler.scala:394)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$getOrCreateParentStages$1(DAGScheduler.scala:580)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:48)
        at scala.collection.SetLike.map(SetLike.scala:104)
        at scala.collection.SetLike.map$(SetLike.scala:104)
        at scala.collection.mutable.AbstractSet.map(Set.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:579)
        at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:564)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1115)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2396)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2388)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2377)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
52287 [main] INFO  org.apache.spark.scheduler.DAGScheduler  - Job 5 failed: countByKey at BaseSparkCommitActionExecutor.java:191, took 0.076660 s

this is coordinator log , request partitionNum is 0

[INFO] 2022-08-11 11:29:49,335 Grpc-301 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], partitionNum[0], partitionNumPerRange[1], replica[1]
full log
[INFO] 2022-08-11 11:29:26,946 Grpc-267 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[0], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:26,946 Grpc-267 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:26,946 Grpc-267 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[0] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:27,033 Grpc-270 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[1], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:27,033 Grpc-270 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:27,034 Grpc-270 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[1] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:37,957 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 2 applications
[INFO] 2022-08-11 11:29:43,047 Grpc-283 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[2], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:43,048 Grpc-283 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:43,048 Grpc-283 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[2] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,165 Grpc-293 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[3], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,166 Grpc-293 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:49,166 Grpc-293 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[3] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,247 Grpc-298 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[4], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,247 Grpc-298 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:49,247 Grpc-298 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[4] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,267 Grpc-297 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[5], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,267 Grpc-297 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:49,268 Grpc-297 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[5] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,335 Grpc-301 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], partitionNum[0], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,335 Grpc-301 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:30:07,957 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 2 applications
[INFO] 2022-08-11 11:30:07,957 ApplicationManager-0 ApplicationManager statusCheck - Remove expired application:spark-d7f3e51ca713472e88568db90c91bdea1660187027133

Environment:

uniffle: firestorm 0.4.1
spark: 3.1.2
hudi: 0.10.0
k8s: v1.21.3

rhh777 avatar Aug 11 '22 03:08 rhh777

When a shuffle have 0 reduce partition, is it a meaningful shuffle? What's situation? Is it a map only application?

jerqi avatar Aug 11 '22 03:08 jerqi

When I look at the RSS Client code, I see that no matter how many partitionNum are requested, an exception is thrown whenever the coordinator returns empty. https://github.com/apache/incubator-uniffle/blob/master/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java

// transform [startPartition, endPartition] -> [server1, server2] to
  // {partition1 -> [server1, server2], partition2 - > [server1, server2]}
  @VisibleForTesting
  public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers(
      GetShuffleAssignmentsResponse response) {
    Map<Integer, List<ShuffleServerInfo>> partitionToServers = Maps.newHashMap();
    List<PartitionRangeAssignment> assigns = response.getAssignmentsList();
    for (PartitionRangeAssignment partitionRangeAssignment : assigns) {
      final int startPartition = partitionRangeAssignment.getStartPartition();
      final int endPartition = partitionRangeAssignment.getEndPartition();
      final List<ShuffleServerInfo> shuffleServerInfos = partitionRangeAssignment
          .getServerList()
          .stream()
          .map(ss -> new ShuffleServerInfo(ss.getId(), ss.getIp(), ss.getPort()))
          .collect(Collectors.toList());
      for (int i = startPartition; i <= endPartition; i++) {
        partitionToServers.put(i, shuffleServerInfos);
      }
    }
    if (partitionToServers.isEmpty()) {
      throw new RssException("Empty assignment to Shuffle Server");
    }
    return partitionToServers;
  }

Is it possible to verify the partitionNum of the request and the partitionNum of the response, or other operations?

// transform [startPartition, endPartition] -> [server1, server2] to
  // {partition1 -> [server1, server2], partition2 - > [server1, server2]}
  @VisibleForTesting
  public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers(RssGetShuffleAssignmentsRequest request,
      GetShuffleAssignmentsResponse response) {
    Map<Integer, List<ShuffleServerInfo>> partitionToServers = Maps.newHashMap();
    List<PartitionRangeAssignment> assigns = response.getAssignmentsList();
    for (PartitionRangeAssignment partitionRangeAssignment : assigns) {
      final int startPartition = partitionRangeAssignment.getStartPartition();
      final int endPartition = partitionRangeAssignment.getEndPartition();
      final List<ShuffleServerInfo> shuffleServerInfos = partitionRangeAssignment
          .getServerList()
          .stream()
          .map(ss -> new ShuffleServerInfo(ss.getId(), ss.getIp(), ss.getPort()))
          .collect(Collectors.toList());
      for (int i = startPartition; i <= endPartition; i++) {
        partitionToServers.put(i, shuffleServerInfos);
      }
    }

    if (request.getPartitionNum() == 0 && partitionToServers.isEmpty()) {
        LOG.warn("dependency partitionNum is 0");
    }else if(partitionToServers.isEmpty()){
        throw new RssException("Empty assignment to Shuffle Server");
    }
    return partitionToServers;
  }

rhh777 avatar Aug 11 '22 04:08 rhh777

When a shuffle have 0 reduce partition, is it a meaningful shuffle? What's situation? Is it a map only application?

emmm, Now I don't know what HUDi does, this is my HUDI test code and UI . The error occurred during the INSERT phase

val conf = new SparkConf()
conf.set("spark.sql.extensions","org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
conf.set("spark.shuffle.manager","org.apache.spark.shuffle.RssShuffleManager")
conf.set("spark.rss.coordinator.quorum","xx.xx.xx.xx:19999")
conf.set("spark.rss.storage.type","MEMORY_LOCALFILE")
val session: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().appName("di_hudi").getOrCreate()
session.sql(
                """
                  |create table if not exists hudi_tb0808_1 (
                  |  id int,
                  |  name string,
                  |  work string
                  |) using hudi
                  |tblproperties (
                  |  type = 'cow',
                  |  primaryKey = 'id',
                  |  preCombineField = 'work'
                  |)
                  |location '/hudi/tmp/xxx1'
                  |""".stripMargin)
session.sql(
                s"""
                   |insert into hudi_tb0808_1 values(${random},'tmp1','hudi_tb0508_1')
                   |""".stripMargin)

session.stop()
image

rhh777 avatar Aug 11 '22 04:08 rhh777

Could you click your job 4 and give me your details of job 4?

jerqi avatar Aug 11 '22 04:08 jerqi

Could you click your job 4 and give me your details of job 4?

image

rhh777 avatar Aug 11 '22 04:08 rhh777

@rhh777 from the log Error happened when getShuffleAssignments with appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], numMaps[0], partitionNumPerRange[1] to coordinator, it's strange that numMaps = 0. And unexpected zero is come from dependency.partitioner().numPartitions(), is it possible to check why zero is returned?

colinmjj avatar Aug 11 '22 06:08 colinmjj

Is you config option spark.default.parallelism zero?

jerqi avatar Aug 11 '22 06:08 jerqi

Is you config option spark.default.parallelism zero?

There is no set

rhh777 avatar Aug 11 '22 06:08 rhh777

@rhh777 from the log Error happened when getShuffleAssignments with appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], numMaps[0], partitionNumPerRange[1] to coordinator, it's strange that numMaps = 0. And unexpected zero is come from dependency.partitioner().numPartitions(), is it possible to check why zero is returned?

ok, I'll try to debug it

rhh777 avatar Aug 11 '22 06:08 rhh777

@rhh777 from the log Error happened when getShuffleAssignments with appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], numMaps[0], partitionNumPerRange[1] to coordinator, it's strange that numMaps = 0. And unexpected zero is come from dependency.partitioner().numPartitions(), is it possible to check why zero is returned?

In the seemingly and SparkHoodieBloomIndexHelper findMatchingFilesForRecordKeys method, I continue to debug

image image

rhh777 avatar Aug 12 '22 07:08 rhh777

The previous HUDI version was incorrectly described, it should be 0.10.0

rhh777 avatar Aug 12 '22 07:08 rhh777

Could we ask why this partitioner has no partition in Hudi community? It's tricky that partition has no partition?

jerqi avatar Aug 12 '22 08:08 jerqi

Sorry for my busy work now, I will further study HUDI when I am free

rhh777 avatar Aug 25 '22 12:08 rhh777

When the RDD is Empty, the shuffle is triggered and the Empty assignment to Shuffle Server

    val conf = new SparkConf()
    conf.set("spark.shuffle.manager","org.apache.spark.shuffle.RssShuffleManager")
    conf.set("spark.rss.coordinator.quorum","localhost:19999")
    conf.set("spark.rss.storage.type","MEMORY_LOCALFILE")

    val session: SparkSession = SparkSession.builder().config(conf).
            master("local").appName("rss_empty_assignment").getOrCreate()

    val rdd: RDD[Row] = session.emptyDataFrame.rdd
    val inputRDD: RDD[(String, Int)] = rdd.map((row: Row) => {
        ("rss_empty_assignment", 1)
    })

    inputRDD.countByKey().foreach(println)
    session.stop()

rhh777 avatar Nov 12 '22 10:11 rhh777

I got it. According to my understanding, this program should print nothing. Actually we shouldn't request the coordinator when we don't have any partition, we could return the RssShuffleHandle directly. You can see https://github.com/apache/incubator-uniffle/blob/4a3d2be36795df2eaf2edbe75ecc5816bf7eb87a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java#L204 for more details. Would you like raise a pr to fix this problem?

jerqi avatar Nov 12 '22 14:11 jerqi

I got it. According to my understanding, this program should print nothing. Actually we shouldn't request the coordinator when we don't have any partition, we could return the RssShuffleHandle directly. You can see

https://github.com/apache/incubator-uniffle/blob/4a3d2be36795df2eaf2edbe75ecc5816bf7eb87a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java#L204

for more details. Would you like raise a pr to fix this problem?

ok

rhh777 avatar Nov 14 '22 12:11 rhh777