hudi
hudi copied to clipboard
Hudi Clustering not working
Describe the problem you faced
Hudi Clustering not working.
I'm using Hudi Delta streamer in continuous mode with Kafka source.
we have 120 partitions in the Kafka topic and the ingestion rate is (200k) RPM
we are using the BULK INSERT mode to ingest data into target location .
But we could see that lot of small files were being generated. In Order to overcome this small file problem we are using the Hudi Clustering ,still we could see files were not being merged.
Configuration for the JOB is
#base properties
hoodie.insert.shuffle.parallelism=50 hoodie.bulkinsert.shuffle.parallelism=200 hoodie.embed.timeline.server=true hoodie.filesystem.view.type=EMBEDDED_KV_STORE hoodie.compact.inline=false hoodie.bulkinsert.sort.mode=none
#cleaner properties hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS hoodie.cleaner.fileversions.retained=60 hoodie.clean.async=true
#archival hoodie.keep.min.commits=12 hoodie.keep.max.commits=15
#datasource properties hoodie.deltastreamer.schemaprovider.registry.url= hoodie.datasource.write.recordkey.field= hoodie.deltastreamer.source.kafka.topic= hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator hoodie.datasource.write.partitionpath.field=timestamp:TIMESTAMP hoodie.deltastreamer.kafka.source.maxEvents=600000000 hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS hoodie.deltastreamer.keygen.timebased.input.timezone=UTC hoodie.deltastreamer.keygen.timebased.output.timezone=UTC hoodie.deltastreamer.keygen.timebased.output.dateformat='dt='yyyy-MM-dd hoodie.clustering.async.enabled=true hoodie.clustering.plan.strategy.target.file.max.bytes=3000000000 hoodie.clustering.plan.strategy.small.file.limit=200000001 hoodie.clustering.async.max.commits=1 hoodie.clustering.plan.strategy.max.num.groups=10
#kafka props bootstrap.servers= schema.registry.url=
Deltastreamer Class Arguments:
- "--table-type"
- "COPY_ON_WRITE"
- "--props"
- "/opt/spark/hudi/config/source.properties"
- "--schemaprovider-class"
- "org.apache.hudi.utilities.schema.SchemaRegistryProvider"
- "--source-class"
- "org.apache.hudi.utilities.sources.JsonKafkaSource"
- "--target-base-path"
- ""
- "--target-table"
- ""
- "--op"
- "BULK_INSERT"
- "--source-ordering-field"
- "timestamp"
- "--continuous"
- "--min-sync-interval-seconds"
- "60"
-
Hudi version :0.9
-
Spark version :2.4.4
-
Storage (HDFS/S3/GCS..) :BLOB
-
Running on Docker? (yes/no) :Kubernetes
Stacktrace
22/06/09 22:11:07 INFO ClusteringUtils: Found 0 files in pending clustering operations
22/06/09 22:11:07 INFO RocksDbBasedFileSystemView: Resetting file groups in pending clustering to ROCKSDB based file-system view at /tmp/hoodie_timeline_rocksdb, Total file-groups=0```
can you try setting hoodie.clustering.plan.strategy.max.bytes.per.group" as a multiple of
hoodie.clustering.plan.strategy.target.file.max.bytes`. atleast equal to. default value is 2GB.
Can you leave the small file and target file config to default value and also for hoodie.clustering.async.max.commits
to default value and see whats happening. This time, can you enable debug logs when you restart your pipeline. We might get more insights.
may be this could be the issue. can you try adding this to spark-submit command
--hoodie-conf hoodie.clustering.async.enabled=true
may be this could be the issue. can you try adding this to spark-submit command
--hoodie-conf hoodie.clustering.async.enabled=true
Hi , I have passed this in the source.properties file
and
tried this as well hoodie.clustering.plan.strategy.max.bytes.per.group
below props are passed
hoodie.insert.shuffle.parallelism=50 hoodie.bulkinsert.shuffle.parallelism=200 hoodie.embed.timeline.server=true hoodie.filesystem.view.type=EMBEDDED_KV_STORE hoodie.compact.inline=false hoodie.bulkinsert.sort.mode=none
#cleaner properties hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS hoodie.cleaner.fileversions.retained=60 hoodie.clean.async=true
#archival hoodie.keep.min.commits=12 hoodie.keep.max.commits=15
#datasource properties hoodie.deltastreamer.schemaprovider.registry.url= hoodie.datasource.write.recordkey.field= hoodie.deltastreamer.source.kafka.topic= hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator hoodie.datasource.write.partitionpath.field=timestamp:TIMESTAMP hoodie.deltastreamer.kafka.source.maxEvents=600000000 hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS hoodie.deltastreamer.keygen.timebased.input.timezone=UTC hoodie.deltastreamer.keygen.timebased.output.timezone=UTC hoodie.deltastreamer.keygen.timebased.output.dateformat='dt='yyyy-MM-dd hoodie.clustering.async.enabled=true hoodie.clustering.plan.strategy.target.file.max.bytes=3000000000 hoodie.clustering.plan.strategy.small.file.limit=200000001 hoodie.clustering.async.max.commits=1 hoodie.clustering.plan.strategy.max.num.groups=10 oodie.clustering.plan.strategy.max.bytes.per.group=9000000000
#kafka props bootstrap.servers= group.id=hudi-lpe auto.offset.reset=(As is said above when i passed earliest it got failed) ,so no other choice to recover i have passed latest hoodie.deltastreamer.source.kafka.checkpoint.type=timestamp
may be this could be the issue. can you try adding this to spark-submit command
--hoodie-conf hoodie.clustering.async.enabled=true
Have tried this option still no luck
I think there are no eligible filegroups for clustering. More than the number of records, clustering is a function of file size. What's the avg file size for the data files w/o clustering. We need to set the clustering configs accordingly? Did you try out the suggestion by Siva i.e. to try with default configs?
Hi @codope , I have first setup the clustering with default configuration only. since it's not working have used these options.
hoodie.clustering.async.enabled=true hoodie.clustering.plan.strategy.target.file.max.bytes=3000000000 hoodie.clustering.plan.strategy.small.file.limit=200000001 hoodie.clustering.async.max.commits=1 hoodie.clustering.plan.strategy.max.num.groups=10 oodie.clustering.plan.strategy.max.bytes.per.group=9000000000
@ksrihari93 : are you still stuck at this issue.
if you can provide us w/ debug logs, would be of great help. we are kind of running out of ideas as to why clustering is not getting triggered.
@ksrihari93 : gentle ping.
Hi Team, Sorry for the late reply. Now it's working fine. we can close this issue.
thanks!. curious to know what was missing or how did you resolve.
Also interested by the way you solve ! I'm stuck with this problem on 0.10.1 version integrated with AWS Glue