carbondata icon indicating copy to clipboard operation
carbondata copied to clipboard

Why opened task less than available executors in case of insert into/load data

Open 01lin opened this issue 3 years ago • 1 comments

In case of insert into or load data, the total number of tasks in the stage is almost equal to the number of hosts, and in general it is much smaller than the available executors. The low parallelism of the stage results in slower execution. Why must the parallelism be constrained on the distinct host? Can start more tasks to increase parallelism and improve resource utilization? Thanks

org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala: loadDataFrame

  /**
   * Execute load process to load from input dataframe
   */
  private def loadDataFrame(
      sqlContext: SQLContext,
      dataFrame: Option[DataFrame],
      carbonLoadModel: CarbonLoadModel
  ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
    try {
      val rdd = dataFrame.get.rdd
      val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
        DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
      }.distinct.length
      val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
        nodeNumOfData,
        sqlContext.sparkContext) 
      val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd, nodes.toArray
        .distinct)

      new NewDataFrameLoaderRDD(
        sqlContext.sparkSession,
        new DataLoadResultImpl(),
        carbonLoadModel,
        newRdd
      ).collect()
    } catch {
      case ex: Exception =>
        LOGGER.error("load data frame failed", ex)
        throw ex
    }
  }

01lin avatar Jun 23 '21 03:06 01lin

It only works for the local_sort loading. It can help to avoid data shuffle during executors.

QiangCai avatar Jun 28 '21 01:06 QiangCai