carbondata
carbondata copied to clipboard
Why opened task less than available executors in case of insert into/load data
In case of insert into or load data, the total number of tasks in the stage is almost equal to the number of hosts, and in general it is much smaller than the available executors. The low parallelism of the stage results in slower execution. Why must the parallelism be constrained on the distinct host? Can start more tasks to increase parallelism and improve resource utilization? Thanks
org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala: loadDataFrame
/**
* Execute load process to load from input dataframe
*/
private def loadDataFrame(
sqlContext: SQLContext,
dataFrame: Option[DataFrame],
carbonLoadModel: CarbonLoadModel
): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
try {
val rdd = dataFrame.get.rdd
val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
}.distinct.length
val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
nodeNumOfData,
sqlContext.sparkContext)
val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd, nodes.toArray
.distinct)
new NewDataFrameLoaderRDD(
sqlContext.sparkSession,
new DataLoadResultImpl(),
carbonLoadModel,
newRdd
).collect()
} catch {
case ex: Exception =>
LOGGER.error("load data frame failed", ex)
throw ex
}
}
It only works for the local_sort loading. It can help to avoid data shuffle during executors.