hudi
hudi copied to clipboard
[SUPPORT]
Tips before filing an issue
-
Have you gone through our FAQs?
-
Join the mailing list to engage in conversations and get faster support at [email protected].
-
If you have triaged this as a bug, then file an issue directly.
We're experiencing a strange issue when deleting records from a table with an UPSERT command. We're noticing, that occasionally, our job will get stuck on the Doing partition and writing data stage for multiple hours. We generally see this vary between 5 - 30 minutes, but will sometimes see this take several hours. For example, in this screen shot, the one task took 5-ish hours (the remaining tasks were skipped).
Our job runs in a loop and works of batches of data, so it is a long running job. We have not yet identified commonalities in the data batches when we run into this. We have not observed high CPU, OOM, loss of data nodes or high datanode space usage, during this time.
We do not see any errors in the driver application logs or the executor/task logs that indicate that something is going wrong. We do see a lot heartbeat files that are written during this time. Digging into the task logs for an example task that took a long time. We see something like:
4-08-23 09:26:32,501 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) org.apache.hudi.io.HoodieMergeHandle: Number of entries in MemoryBasedMap => 170, Total size in bytes of MemoryBasedMap => 165668, Number of entries in BitCaskDiskMap => 0, Size of file spilled to disk => 0
2024-08-23 09:26:32,501 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) org.apache.hudi.io.HoodieMergeHandle: partitionPath:region=NA/year=2024/month=02/day=14/hour=00, fileId to be merged:b8973e4d-5bc9-4a3a-ba99-8181c9831056-0
2024-08-23 09:26:32,575 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) org.apache.hudi.io.HoodieMergeHandle: Merging new data into oldPath s3://bucket-name/region=NA/year=2024/month=02/day=14/hour=00/b8973e4d-5bc9-4a3a-ba99-8181c9831056-0_4-4995-1712252_20240815132438223.parquet, as newPath s3://bucket-name/region=NA/year=2024/month=02/day=14/hour=00/bucket-name
2024-08-23 09:26:33,241 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) org.apache.hudi.table.marker.DirectWriteMarkers: Creating Marker Path=s3://bucket-name/.hoodie/.temp/20240823091953364/region=NA/year=2024/month=02/day=14/hour=00/bucket-name.marker.MERGE
2024-08-23 09:26:33,275 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) com.amazon.ws.emr.hadoop.fs.cse.CSEMultipartUploadOutputStream: close closed:false s3://bucket-name/.hoodie/.temp/20240823091953364/region=NA/year=2024/month=02/day=14/hour=00/bucket-name.marker.MERGE
2024-08-23 09:26:33,460 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) com.amazon.ws.emr.hadoop.fs.s3.upload.dispatch.DefaultMultipartUploadDispatcher: Completed multipart upload of 1 parts 0 bytes
2024-08-23 09:26:33,460 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) com.amazon.ws.emr.hadoop.fs.cse.CSEMultipartUploadOutputStream: Finished uploading bucket-name/.hoodie/.temp/20240823091953364/region=NA/year=2024/month=02/day=14/hour=00/bucket-name.marker.MERGE. Elapsed seconds: 0.
2024-08-23 09:26:33,460 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) org.apache.hudi.table.marker.DirectWriteMarkers: [direct] Created marker file s3://bucket-name/.hoodie/.temp/20240823091953364/region=NA/year=2024/month=02/day=14/hour=00/bucket-name.marker.MERGE in 885 ms:%
The next time we see this task id in the logs, we see:
2024-08-23 11:18:09,558 [INFO] (producer-thread-1) org.apache.hudi.common.util.queue.IteratorBasedQueueProducer: finished buffering records
2024-08-23 11:18:09,560 [INFO] (consumer-thread-1) org.apache.hudi.common.util.queue.BoundedInMemoryExecutor: Queue Consumption is done; notifying producer threads
2024-08-23 11:18:12,007 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) com.amazon.ws.emr.hadoop.fs.cse.CSEMultipartUploadOutputStream: close closed:false s3://bucket-name/table_name/region=NA/year=2024/month=02/day=14/hour=00/b8973e4d-5bc9-4a3a-ba99-8181c9831056-0_4-5965-2033103_20240823091953364.parquet
2024-08-23 11:18:13,383 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) com.amazon.ws.emr.hadoop.fs.s3.upload.dispatch.DefaultMultipartUploadDispatcher: Completed multipart upload of 6 parts 766624225 bytes
2024-08-23 11:18:13,383 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) com.amazon.ws.emr.hadoop.fs.cse.CSEMultipartUploadOutputStream: Finished uploading bucket-name/table_name/region=NA/year=2024/month=02/day=14/hour=00/b8973e4d-5bc9-4a3a-ba99-8181c9831056-0_4-5965-2033103_20240823091953364.parquet. Elapsed seconds: 6699.
2024-08-23 11:18:13,513 [INFO] (Executor task launch worker for task 4.0 in stage 5965.0 (TID 2033103)) org.apache.hudi.io.HoodieMergeHandle: MergeHandle for partitionPath region=NA/year=2024/month=02/day=14/hour=00 fileID b8973e4d-5bc9-4a3a-ba99-8181c9831056-0, took 6701018 ms.
Note the 5th line that says that elapsed seconds (6699 = 1.86 hours). When looking at the uploaded file in question, I notice that it is about ~800MB in size. Is it expected that such a file size would bottleneck the job like this?
Here's the config we're using (with sensitive info redacted):
hoodie.parquet.small.file.limit -> 104857600
hoodie.datasource.write.precombine.field -> eventVersion
hoodie.datasource.write.payload.class -> org.apache.hudi.common.model.EmptyHoodieRecordPayload
hoodie.bloom.index.filter.dynamic.max.entries -> 1106137
hoodie.cleaner.fileversions.retained -> 2
hoodie.aws.secret.key -> EW/jzY57lWAQVDazxzK31jLhTFugZPhbQvZsvP2D
hoodie.parquet.max.file.size -> 134217728
hoodie.cleaner.parallelism -> 1500
hoodie.write.lock.client.num_retries -> 10
hoodie.delete.shuffle.parallelism -> 1500
hoodie.bloom.index.prune.by.ranges -> true
hoodie.metadata.enable -> false
hoodie.clean.automatic -> false
hoodie.datasource.write.operation -> upsert
hoodie.write.lock.wait_time_ms -> 600000
hoodie.metrics.reporter.type -> CLOUDWATCH
hoodie.datasource.write.recordkey.field -> timestamp,eventId,subType,trackedItem
hoodie.table.name -> my_table_name
hoodie.datasource.write.table.type -> COPY_ON_WRITE
hoodie.datasource.write.hive_style_partitioning -> true
hoodie.datasource.write.partitions.to.delete ->
hoodie.write.lock.dynamodb.partition_key -> my_table_name_key
hoodie.cleaner.policy -> KEEP_LATEST_FILE_VERSIONS
hoodie.write.markers.type -> DIRECT
hoodie.metrics.on -> false
hoodie.datasource.write.reconcile.schema -> true
hoodie.datasource.write.keygenerator.class -> org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.cleaner.policy.failed.writes -> LAZY
hoodie.upsert.shuffle.parallelism -> 1500
hoodie.write.lock.dynamodb.table -> HoodieLockTable
hoodie.write.lock.provider -> org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider
hoodie.datasource.write.partitionpath.field -> region,year,month,day,hour
hoodie.bloom.index.filter.type -> DYNAMIC_V0
hoodie.write.lock.wait_time_ms_between_retry -> 30000
hoodie.write.concurrency.mode -> optimistic_concurrency_control
hoodie.write.lock.dynamodb.region -> us-east-1
Environment Description
-
Hudi version : 0.11.1
-
Spark version : 3.2.1
-
Hive version : 3.1.3
-
Hadoop version : 3.2.1
-
Storage (HDFS/S3/GCS..) : S3
-
Running on Docker? (yes/no) : no
Happy to provide more info. Thanks!
Editing to add cluster specifics:
- Running on a 60 node cluster, r5a.8xlarge.
- spark configs:
[
{
"Classification": "spark-defaults",
"Properties": {
"spark.default.parallelism": "3350",
"spark.driver.cores": "5",
"spark.driver.extraJavaOptions": "-Djavax.net.ssl.trustStore=/home/hadoop/trust-store/InternalAndExternalTrustStore.jks -Djavax.net.ssl.trustStorePassword=amazon -XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraLibraryPath": "/home/hadoop/caching-shared-objects:/usr/lib/hadoop-lzo/lib/native",
"spark.driver.memory": "37477M",
"spark.driver.memoryOverhead": "4164M",
"spark.dynamicAllocation.enabled": "false",
"spark.eventLog.logBlockUpdates.enabled": "true",
"spark.executor.cores": "5",
"spark.executor.extraJavaOptions": "-Djavax.net.ssl.trustStore=/home/hadoop/trust-store/InternalAndExternalTrustStore.jks -Djavax.net.ssl.trustStorePassword=amazon -XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.executor.extraLibraryPath": "/home/hadoop/caching-shared-objects:/usr/lib/hadoop-lzo/lib/native",
"spark.executor.heartbeatInterval": "60s",
"spark.executor.instances": "335",
"spark.executor.memory": "37477M",
"spark.executor.memoryOverhead": "4164M",
"spark.history.fs.cleaner.enabled": "true",
"spark.history.fs.cleaner.interval": "1d",
"spark.history.fs.cleaner.maxAge": "7d",
"spark.memory.fraction": "0.80",
"spark.memory.storageFraction": "0.30",
"spark.network.timeout": "800s",
"spark.rdd.compress": "true",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.shuffle.compress": "true",
"spark.shuffle.spill.compress": "true",
"spark.sql.shuffle.partitions": "750",
"spark.storage.level": "MEMORY_AND_DISK_SER",
"spark.yarn.scheduler.reporterThread.maxFailures": "5"
}
},
{
"Classification": "capacity-scheduler",
"Properties": {
"yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
}
}
]
I saved a thread dump but here is a screenshot of the thread dump for the executor, which suggests that it's getting hung up here: https://github.com/apache/hudi/blob/release-0.11.1/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java#L147
My speculation is that it is because of the false positive of bloom_filter index, while because you are still using 0.11 release, maybe you can try simple index instead.
Thanks for the response! Is there any way to confirm the fp rate?
There is a stage named like "buildPartitionProfile" which would take long time if the fp happens.
Thanks. Switching to SIMPLE index doesn't seem to help. Job fails at the following job and stage:
The task logs show OOM error:
2024-08-25T05:22:43.611+0000: [Full GC (Allocation Failure) 36580M->33919M(40960M), 46.7644028 secs]
[Eden: 0.0B(1792.0M)->0.0B(2048.0M) Survivors: 256.0M->0.0B Heap: 36580.4M(40960.0M)->33919.2M(40960.0M)], [Metaspace: 117312K->117312K(135168K)]
[Times: user=77.83 sys=0.20, real=46.76 secs]
2024-08-25T05:23:30.376+0000: [Full GC (Allocation Failure) 33919M->33904M(40960M), 43.9530911 secs]
[Eden: 0.0B(2048.0M)->0.0B(2048.0M) Survivors: 0.0B->0.0B Heap: 33919.2M(40960.0M)->33904.1M(40960.0M)], [Metaspace: 117312K->113897K(135168K)]
[Times: user=74.71 sys=0.03, real=43.96 secs]
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p
kill -9 %p"
# Executing /bin/sh -c "kill -9 829
kill -9 829"...
For settings, I set simple index and upsert parallelism to 1500, but it doesn't seem to be using that setting here. Are there any HUDI config knobs you recommend tweaking to increase the throughput here?