hudi icon indicating copy to clipboard operation
hudi copied to clipboard

hoodie.bulkinsert.shuffle.parallelism Not activated

Open zhangjw123321 opened this issue 1 year ago • 3 comments

Describe the problem you faced

1.source table (ods.ods_company) is 1w files, 2.set hoodie.bulkinsert.shuffle.parallelism=100 Not activated, 3.insert into hudi table after ,hudi table is 1w files, set hoodie.bulkinsert.shuffle.parallelism Not activated, The correct number is 100 files,not 1w files。

To Reproduce

Steps to reproduce the behavior:

1./opt/software/spark-3.2.1/bin/spark-sql
--master yarn --conf spark.ui.port=4049
--conf spark.ui.showConsoleProgress=true
--conf spark.hadoop.hive.cli.print.header=true
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
--queue root.hdfs
--driver-memory 5g
--executor-memory 20g
--executor-cores 10
--num-executors 20 2.CREATE TABLE IF NOT EXISTS hudi_ods.ods_company( id bigint,


)using hudi tblproperties ( type = 'cow', primaryKey = 'id', preCombineField = 'dt' ) 3. set hoodie.spark.sql.insert.into.operation=bulk_insert; set hoodie.bulkinsert.shuffle.parallelism=100; 4. insert into table hudi_ods.ods_company select * from ods.ods_company where dt='2023-12-15'; Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

  • Hudi version :0.14

  • Spark version :3.2

  • Hive version :2.3.1

  • Hadoop version :2.10

  • Storage (HDFS/S3/GCS..) :HDFS

  • Running on Docker? (yes/no) :no

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

zhangjw123321 avatar Dec 27 '23 09:12 zhangjw123321

@zhangjw123321 The number of input partition/spark tasks is derived from the input dataset. How many tasks are getting created and what is the nature of source of dataset?

ad1happy2go avatar Dec 27 '23 12:12 ad1happy2go

This dataset is imported by mysql into the hive table,Use sqoop-m 10000。

zhangjw123321 avatar Dec 28 '23 02:12 zhangjw123321

First of all, thank you very much for your reply. Advanced configuration is introduced here, after I configure this parameter is not effective, I think the normal configuration parameters, hudi table file number is and (hoodie. bulkinsert. shuffle. parallelism) parameters of concurrent number consistent, the reason of this parameter is not effective, please what? image

zhangjw123321 avatar Dec 28 '23 03:12 zhangjw123321

@zhangjw123321 This is for number of partitions after shuffle stage(Mainly used when we use any sort mode or custom partitioning as mentioned in doc). Can you show the spark DAG once. How many stages it is creating?

ad1happy2go avatar Dec 29 '23 09:12 ad1happy2go

image image

zhangjw123321 avatar Jan 02 '24 09:01 zhangjw123321

image

zhangjw123321 avatar Jan 02 '24 10:01 zhangjw123321

@zhangjw123321 Its going in deduping records. For bulk insert it doesn't dedup with the default configs. Are you setting any other configs?

ad1happy2go avatar Jan 02 '24 14:01 ad1happy2go

Which stage is deleting duplicate records,Other than the above configuration, no other configuration is manually set。 Download the https://dlcdn.apache.org/hudi/0.14.0/hudi-0.14.0.src.tgz, maven editing hudi - spark3.2 - bundle_2. 12-0.14.0. Jar

zhangjw123321 avatar Jan 03 '24 01:01 zhangjw123321

@zhangjw123321 look like hoodie.bulkinsert.shuffle.parallelism can not work on non-partitioned table in the code. In the spark ui, may be you not set spark.default.parallelism so reduceBykey will use the parent rdd partitions size. Can you try set spark.default.parallelism=100; I think it will reduce the parallelism in stage 10 to 100.

KnightChess avatar Jan 04 '24 14:01 KnightChess

I think hoodie.bulkinsert.sort.mode is helpful

gtzysh avatar Jan 05 '24 06:01 gtzysh

image set hoodie.spark.sql.insert.into.operation=bulk_insert; set hoodie.bulkinsert.shuffle.parallelism=100; set spark.default.parallelism=100; set spark.sql.shuffle.partitions=100; After these parameters are used, the hdfs hudi file is still 10000

@zhangjw123321 look like hoodie.bulkinsert.shuffle.parallelism can not work on non-partitioned table in the code. In the spark ui, may be you not set spark.default.parallelism so reduceBykey will use the parent rdd partitions size. Can you try set spark.default.parallelism=100; I think it will reduce the parallelism in stage 10 to 100.

zhangjw123321 avatar Jan 05 '24 09:01 zhangjw123321

@zhangjw123321 I test in my local, spark.default.parallelism look like can not effect in sql set, can you set when submit spark job, like --conf. before try it, how much cores executors have total when this job execute? 10000 cores? or ods.ods_company where dt='2023-12-15' have 10000 files?

KnightChess avatar Jan 05 '24 09:01 KnightChess

@zhangjw123321 I test in my local, spark.default.parallelism look like can not effect in sql set, can you set when submit spark job, like --conf. before try it, how much cores executors have total when this job execute? 10000 cores? or ods.ods_company where dt='2023-12-15' have 10000 files? 1.Use --executor-cores 10 --num-executors 20 or --executor-cores 10 --num-executors 10 2.Yes,10000 FIles。

zhangjw123321 avatar Jan 05 '24 10:01 zhangjw123321

@zhangjw123321 you can try set it in spark submit, --conf, or by code sparkconf.set('xxx','yyy'), will match other branch, not use parent rdd partition size image

KnightChess avatar Jan 05 '24 10:01 KnightChess

I try to set the number of files that can be generated normally. Thank you very much. @KnightChess @ad1happy2go

zhangjw123321 avatar Jan 08 '24 06:01 zhangjw123321

@zhangjw123321 I create a issue to track it, https://issues.apache.org/jira/browse/HUDI-7277

KnightChess avatar Jan 08 '24 07:01 KnightChess

#10532 fix hoodie.bulkinsert.shuffle.parallelism can not work when dedup source.

KnightChess avatar Jan 22 '24 05:01 KnightChess