hudi
hudi copied to clipboard
[SUPPORT] Can the process of "Building workload profile" be optimized? Too slow, half the time is spent in this stage
Describe the problem you faced
private Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(HoodieData<HoodieRecord<T>> inputRecords) {
HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<>();
WorkloadStat globalStat = new WorkloadStat();
// group the records by partitionPath + currentLocation combination, count the number of
// records in each partition
**Map<Tuple2<String, Option<HoodieRecordLocation>>, Long> partitionLocationCounts = inputRecords
.mapToPair(record -> Pair.of(
new Tuple2<>(record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record))
.countByKey();**
// count the number of both inserts and updates in each partition, update the counts to workLoadStats
for (Map.Entry<Tuple2<String, Option<HoodieRecordLocation>>, Long> e : partitionLocationCounts.entrySet()) {
String partitionPath = e.getKey()._1();
Long count = e.getValue();
Option<HoodieRecordLocation> locOption = e.getKey()._2();
if (!partitionPathStatMap.containsKey(partitionPath)) {
partitionPathStatMap.put(partitionPath, new WorkloadStat());
}
if (locOption.isPresent()) {
// update
partitionPathStatMap.get(partitionPath).addUpdates(locOption.get(), count);
globalStat.addUpdates(locOption.get(), count);
} else {
// insert
partitionPathStatMap.get(partitionPath).addInserts(count);
globalStat.addInserts(count);
}
}
return Pair.of(partitionPathStatMap, globalStat);
}
org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:105)
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:200)
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:174)
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:86)
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:63)
org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:45)
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:98)
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:88)
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:142)
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:224)
org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:504)
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:502)
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:108)
org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:61)
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
To Reproduce
Steps to reproduce the behavior:
Expected behavior
A clear and concise description of what you expected to happen.
Environment Description
-
Hudi version :0.14.1-RC1
-
Spark version :3.2.1
-
Hive version :3.1.3
-
Hadoop version :3.2.2
-
Storage (HDFS/S3/GCS..) :s3
-
Running on Docker? (yes/no) :no
@zyclove can you let us know more about your data size ( existing and incremental) , table and writer configuration.