hudi icon indicating copy to clipboard operation
hudi copied to clipboard

Partitioning data into two keys is taking more time (10x) than partitioning into one key.

Open maheshguptags opened this issue 1 year ago • 28 comments

I am trying to add second level of partition to my table instead of one level of partition but it is taking 10X time as compared to single level partition in hudi flink job.

I tried to ingest 1.8M record into one level of partition and it took around 12-15 Min to ingest all the data then with same configuration I just added another level of partition key with same data payload and it took around 1 hour 45 Min to complete the process.

To Reproduce

Steps to reproduce the behavior: below is the configuration that I am using for table. You can add the table creation statement with below properties.

PARTITIONED BY (`client_id`,`hashed_server_id`)
WITH ('connector' = 'hudi','path' = '${table_location}',
'table.type' = 'COPY_ON_WRITE',
'hoodie.datasource.write.recordkey.field' = 'a,b',
'payload.class'='x.y.PartialUpdate',
'precombine.field'='ts',
'hoodie.clean.async'='true',
'hoodie.cleaner.policy' = 'KEEP_LATEST_COMMITS',
'hoodie.clean.automatic' = 'true',
'hoodie.clean.max.commits'='5',
'hoodie.clean.trigger.strategy'='NUM_COMMITS',
'hoodie.cleaner.parallelism'='100',
'hoodie.cleaner.commits.retained'='4',
'hoodie.index.type'= 'BUCKET',
'hoodie.index.bucket.engine' = 'SIMPLE',
'hoodie.bucket.index.num.buckets'='16',
'hoodie.bucket.index.hash.field'='a',
'hoodie.parquet.small.file.limit'='104857600',
'hoodie.parquet.compression.codec'='snappy')

Expected behavior As it is just a partition addition to the storage it should not impact the performance much(I can understand if it takes 5-7 min extra as complexkey generation is bit slower than simplekey ).

Environment Description

  • Flink 1.17.1

  • Hudi version : 14

  • Spark version : NA

  • Hive version : NA

  • Hadoop version : 3.4.0

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) :Yes

Additional context

My table type is upsert and I have test the functionality and it is working fine and I cannot change the table type.

I also discussed with @ad1happy2go and he also suggested that it wont impact much as it just a another level of partition.

CC : @ad1happy2go @codope @danny0405 @yo

maheshguptags avatar Jan 08 '24 08:01 maheshguptags

Add a partition field means more tasks. And the index is BUCKET, the tasks could be bucket_num*partitions in some cases.

xicm avatar Jan 09 '24 09:01 xicm

@xicm Yes I agree but it would not effect it to 10 times.

let say I have 100 partition and each partition has 10 sub-partition with 16 bucket then total task would be 1001016 at max whereas with single partition it has 100*16 right?

I understand it will take 5-7 min extra compared to single partition but not 10 times.

let me know your thoughts

maheshguptags avatar Jan 09 '24 09:01 maheshguptags

not sure if this is the cause, can you check the number of file groups after partition field changed, and reduce the bucket number to see the time cost.

xicm avatar Jan 09 '24 09:01 xicm

@xicm let me reduce the number of bucket and test it for same number of record to check process time. can you tell me how to check number of filegroup?

maheshguptags avatar Jan 09 '24 10:01 maheshguptags

@xicm I reduced the number of bucket( it makes sense to reduce the bucket size as we have second level partition) but it is still taking 45-50 min to execute which 5 times as compare to 1 level partition.

maheshguptags avatar Jan 09 '24 11:01 maheshguptags

can you tell me how to check number of filegroup?

cli or spark sql, show_commits, pay attention to total_files_added and total_files_updated

it is still taking 45-50 min to execute which 5 times as compare to 1 level partition.

reduce the bucket num and increase write.tasks, test a few times to get a better performance

xicm avatar Jan 10 '24 01:01 xicm

Yes I am trying to test the different combination with bucket number.

maheshguptags avatar Jan 10 '24 05:01 maheshguptags

Hi @xicm, I tried below combination with same number record. image

Please find the below details related to filegroups image

After testing it several times I noticed that 8,4 buckets looks good for data size which is <100M.

As we know once the number of buckets is set we cannot change it.

so I have question related to same.

Suppose I took 8 as buckets and the streaming data is constantly growing (100 million per ID), will it affect the performance (considering that the job is streaming)?

Thanks Mahesh Gupta

maheshguptags avatar Jan 10 '24 08:01 maheshguptags

Small bucket num will not fit the growing data. Generally We estimate the data size to determine the number of buckets.

I think you problem is the data is too scattered. There are many tasks, need bigger write.tasks

xicm avatar Jan 10 '24 09:01 xicm

I already had 20 task to write the data, please check in below screenshot. do you want me to increase it more? image

maheshguptags avatar Jan 10 '24 09:01 maheshguptags

Can you check the SubTasks of bucket_assigner in flink ui. This tells us how many tasks in a write operation.

xicm avatar Jan 10 '24 10:01 xicm

Yes. it is 20. it start from 0 and ending with 19. image

maheshguptags avatar Jan 10 '24 10:01 maheshguptags

Sorry for my wrong understanding of SubTasks. Hudi splits the input data by partition+fileGroup and then writes these partitioned data with parallelism of write.tasks. The job write 2000+ files in a commit, parallelism of 20 is too small.

xicm avatar Jan 11 '24 02:01 xicm

@xicm Let me try to increase the number write task and for load and test the performance. Is there a way to control the number of file group for particular commit?

thanks

maheshguptags avatar Jan 11 '24 05:01 maheshguptags

hoodie.bucket.index.num.buckets controls the number of buckets under one partiiton, and by default it is 4 in Flink.

danny0405 avatar Jan 12 '24 09:01 danny0405

@danny0405 I am asking about the number of file group added for particular commit. I am already implementing bucket index. Number of filegroup is more than 2000 for a commit.

maheshguptags avatar Jan 12 '24 10:01 maheshguptags

Hi @xicm and @danny0405, I tried to increase the parallelism as @xicm suggested but it is trying to consume the data in a single commit i.e. it accumulates the data into a single commit which causes a Heap OOM issue.

image

Commit size from .hoodie folder

The second commit is trying to consume the entire data in one commit, i.e., creating a 41MB .commit file.

image Can we reduce/control the commit filesize? Can we hop on a call to resolve this issue?

let me know your thoughts. Thanks

maheshguptags avatar Jan 16 '24 05:01 maheshguptags

Can you redesign the partitions? There are only 1G - 2G of data, but there are so many partitions.

xicm avatar Jan 16 '24 06:01 xicm

@xicm The dataset is huge, around 100M. However, for performance evaluation, I have only ingested 1.78M. cc : @pratyakshsharma

maheshguptags avatar Jan 16 '24 06:01 maheshguptags

Yeah, try to deduct the number of file groups per-commit, because for each file group, we have a in-memory buffer before flushing into disk.

danny0405 avatar Jan 17 '24 02:01 danny0405

@danny0405 can you please share the config to deduct the filegroup per-commit?

maheshguptags avatar Jan 17 '24 05:01 maheshguptags

Discussed with @maheshguptags . Advised to explore flink Kafka stream configs to control number of records/bytes in one MicroBatch. https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

cc @danny0405

ad1happy2go avatar Jan 18 '24 10:01 ad1happy2go

@ad1happy2go I tried the below configuration for Kafka, but it didn't help.

source.kafka.max.poll.records=300
source.kafka.max.poll.interval.ms=300000

I tried different configurations for the above config. cc : @danny0405

maheshguptags avatar Jan 19 '24 08:01 maheshguptags

@danny0405 Can help with this plz?

maheshguptags avatar Jan 29 '24 08:01 maheshguptags

@maheshguptags Lets get into a call to discuss this further.

ad1happy2go avatar Jan 31 '24 10:01 ad1happy2go

@xicm @danny0405 Had a discussion with @maheshguptags . Let me try to summarise his issue.

He is having around 5000 partitions in total and using the bucket index. When he use parallelism(write.tasks) as 20 the job takes 1:45 mins and when it is 100 it takes 35 mins.

But with increase in parallelism, the number of file groups explodes as expected. This result in lot of small file groups with very few records each (~20) , which ultimately causing OOM due to 400MB commit files.

ad1happy2go avatar Jan 31 '24 11:01 ad1happy2go

Hi @ad1happy2go, There is little correction on the commit file size.

which ultimately causing OOM due to 400MB commit files.

its a 41 Mb commit file size @ad1happy2go.

maheshguptags avatar Jan 31 '24 14:01 maheshguptags

@danny0405 still waiting for your response. can you please take look on this plz?

maheshguptags avatar Feb 21 '24 05:02 maheshguptags