hudi
hudi copied to clipboard
hoodie.bulkinsert.shuffle.parallelism Not activated
Describe the problem you faced
1.source table (ods.ods_company) is 1w files, 2.set hoodie.bulkinsert.shuffle.parallelism=100 Not activated, 3.insert into hudi table after ,hudi table is 1w files, set hoodie.bulkinsert.shuffle.parallelism Not activated, The correct number is 100 files,not 1w files。
To Reproduce
Steps to reproduce the behavior:
1./opt/software/spark-3.2.1/bin/spark-sql
--master yarn --conf spark.ui.port=4049
--conf spark.ui.showConsoleProgress=true
--conf spark.hadoop.hive.cli.print.header=true
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
--queue root.hdfs
--driver-memory 5g
--executor-memory 20g
--executor-cores 10
--num-executors 20
2.CREATE TABLE IF NOT EXISTS hudi_ods.ods_company(
id bigint,
)using hudi tblproperties ( type = 'cow', primaryKey = 'id', preCombineField = 'dt' ) 3. set hoodie.spark.sql.insert.into.operation=bulk_insert; set hoodie.bulkinsert.shuffle.parallelism=100; 4. insert into table hudi_ods.ods_company select * from ods.ods_company where dt='2023-12-15'; Expected behavior
A clear and concise description of what you expected to happen.
Environment Description
-
Hudi version :0.14
-
Spark version :3.2
-
Hive version :2.3.1
-
Hadoop version :2.10
-
Storage (HDFS/S3/GCS..) :HDFS
-
Running on Docker? (yes/no) :no
Additional context
Add any other context about the problem here.
Stacktrace
Add the stacktrace of the error.
@zhangjw123321 The number of input partition/spark tasks is derived from the input dataset. How many tasks are getting created and what is the nature of source of dataset?
This dataset is imported by mysql into the hive table,Use sqoop-m 10000。
First of all, thank you very much for your reply.
Advanced configuration is introduced here, after I configure this parameter is not effective, I think the normal configuration parameters, hudi table file number is and (hoodie. bulkinsert. shuffle. parallelism) parameters of concurrent number consistent, the reason of this parameter is not effective, please what?
@zhangjw123321 This is for number of partitions after shuffle stage(Mainly used when we use any sort mode or custom partitioning as mentioned in doc). Can you show the spark DAG once. How many stages it is creating?
@zhangjw123321 Its going in deduping records. For bulk insert it doesn't dedup with the default configs. Are you setting any other configs?
Which stage is deleting duplicate records,Other than the above configuration, no other configuration is manually set。 Download the https://dlcdn.apache.org/hudi/0.14.0/hudi-0.14.0.src.tgz, maven editing hudi - spark3.2 - bundle_2. 12-0.14.0. Jar
@zhangjw123321 look like hoodie.bulkinsert.shuffle.parallelism
can not work on non-partitioned table in the code. In the spark ui, may be you not set spark.default.parallelism
so reduceBykey
will use the parent rdd partitions size. Can you try set spark.default.parallelism=100;
I think it will reduce the parallelism in stage 10
to 100.
I think hoodie.bulkinsert.sort.mode is helpful
set hoodie.spark.sql.insert.into.operation=bulk_insert;
set hoodie.bulkinsert.shuffle.parallelism=100;
set spark.default.parallelism=100;
set spark.sql.shuffle.partitions=100;
After these parameters are used, the hdfs hudi file is still 10000
@zhangjw123321 look like
hoodie.bulkinsert.shuffle.parallelism
can not work on non-partitioned table in the code. In the spark ui, may be you not setspark.default.parallelism
soreduceBykey
will use the parent rdd partitions size. Can you tryset spark.default.parallelism=100;
I think it will reduce the parallelism instage 10
to 100.
@zhangjw123321 I test in my local, spark.default.parallelism
look like can not effect in sql set, can you set when submit spark job, like --conf. before try it, how much cores executors have total when this job execute? 10000 cores? or ods.ods_company where dt='2023-12-15' have 10000 files?
@zhangjw123321 I test in my local,
spark.default.parallelism
look like can not effect in sql set, can you set when submit spark job, like --conf. before try it, how much cores executors have total when this job execute? 10000 cores? or ods.ods_company where dt='2023-12-15' have 10000 files? 1.Use --executor-cores 10 --num-executors 20 or --executor-cores 10 --num-executors 10 2.Yes,10000 FIles。
@zhangjw123321 you can try set it in spark submit, --conf, or by code sparkconf.set('xxx','yyy'), will match other branch, not use parent rdd partition size
I try to set the number of files that can be generated normally. Thank you very much. @KnightChess @ad1happy2go
@zhangjw123321 I create a issue to track it, https://issues.apache.org/jira/browse/HUDI-7277
#10532 fix hoodie.bulkinsert.shuffle.parallelism
can not work when dedup source.