hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Can the process of "Building workload profile" be optimized? Too slow, half the time is spent in this stage

Open zyclove opened this issue 1 year ago • 1 comments

Describe the problem you faced image

private Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(HoodieData<HoodieRecord<T>> inputRecords) {
    HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<>();
    WorkloadStat globalStat = new WorkloadStat();

    // group the records by partitionPath + currentLocation combination, count the number of
    // records in each partition
    **Map<Tuple2<String, Option<HoodieRecordLocation>>, Long> partitionLocationCounts = inputRecords
        .mapToPair(record -> Pair.of(
            new Tuple2<>(record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record))
        .countByKey();**

    // count the number of both inserts and updates in each partition, update the counts to workLoadStats
    for (Map.Entry<Tuple2<String, Option<HoodieRecordLocation>>, Long> e : partitionLocationCounts.entrySet()) {
      String partitionPath = e.getKey()._1();
      Long count = e.getValue();
      Option<HoodieRecordLocation> locOption = e.getKey()._2();

      if (!partitionPathStatMap.containsKey(partitionPath)) {
        partitionPathStatMap.put(partitionPath, new WorkloadStat());
      }

      if (locOption.isPresent()) {
        // update
        partitionPathStatMap.get(partitionPath).addUpdates(locOption.get(), count);
        globalStat.addUpdates(locOption.get(), count);
      } else {
        // insert
        partitionPathStatMap.get(partitionPath).addInserts(count);
        globalStat.addInserts(count);
      }
    }
    return Pair.of(partitionPathStatMap, globalStat);
  }
org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:105)
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:200)
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:174)
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:86)
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:63)
org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:45)
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:98)
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:88)
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:142)
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:224)
org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:504)
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:502)
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:108)
org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:61)
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)

To Reproduce

Steps to reproduce the behavior:

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

  • Hudi version :0.14.1-RC1

  • Spark version :3.2.1

  • Hive version :3.1.3

  • Hadoop version :3.2.2

  • Storage (HDFS/S3/GCS..) :s3

  • Running on Docker? (yes/no) :no

zyclove avatar Dec 18 '23 03:12 zyclove

@zyclove can you let us know more about your data size ( existing and incremental) , table and writer configuration.

ad1happy2go avatar Dec 18 '23 08:12 ad1happy2go