hudi
hudi copied to clipboard
[SUPPORT] High runtime for a batch in SparkWriteHelper stage
Describe the problem you faced
We are observing higher run times for a batch , it took 15hr plus to complete single batch, the subsequent batches are running fine. The dataset in question is not big. Attaching few screenshots for reference, GC times are less. hoodieConfigs for reference



To Reproduce
Steps to reproduce the behavior:
Expected behavior
A clear and concise description of what you expected to happen.
Environment Description
-
Hudi version : 0.10.1
-
Spark version : 3.0.3
-
Hive version : 3.1.2
-
Hadoop version : 3.2.2
-
Storage (HDFS/S3/GCS..) : S3
-
Running on Docker? (yes/no) : NO
Additional context
Hudi Configs
hoodieConfigs:
hoodie.datasource.write.operation: upsert
hoodie.datasource.write.table.type: MERGE_ON_READ
hoodie.datasource.write.partitionpath.field: ""
hoodie.datasource.write.keygenerator.class: org.apache.hudi.keygen.NonpartitionedKeyGenerator
hoodie.metrics.on: true
hoodie.metrics.reporter.type: CLOUDWATCH
hoodie.datasource.hive_sync.partition_extractor_class: org.apache.hudi.hive.NonPartitionedExtractor
hoodie.parquet.max.file.size: 6110612736
hoodie.compact.inline: true
hoodie.clean.automatic: true
hoodie.compact.inline.trigger.strategy: NUM_AND_TIME
hoodie.clean.async: true
hoodie.cleaner.policy: KEEP_LATEST_COMMITS
hoodie.cleaner.commits.retained: 120
hoodie.keep.min.commits: 130
hoodie.keep.max.commits: 131
Spark Job configs
{
"className": "com.hotstar.driver.CdcCombinedDriver",
"proxyUser": "root",
"driverCores": 1,
"executorCores": 4,
"executorMemory": "4G",
"driverMemory": "4G",
"queue": "cdc",
"name": "hudiJob",
"file": "s3a://bucket/jars/prod.jar",
"conf": {
"spark.eventLog.enabled": "false",
"spark.ui.enabled": "true",
"spark.streaming.concurrentJobs": "1",
"spark.streaming.backpressure.enabled": "false",
"spark.streaming.kafka.maxRatePerPartition": "500",
"spark.yarn.am.nodeLabelExpression": "cdc",
"spark.shuffle.service.enabled": "true",
"spark.driver.maxResultSize": "8g",
"spark.driver.memoryOverhead": "2048",
"spark.executor.memoryOverhead": "2048",
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.minExecutors": "25",
"spark.dynamicAllocation.maxExecutors": "50",
"spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": "2",
"spark.jars.packages": "org.apache.spark:spark-avro_2.12:3.0.2,com.izettle:metrics-influxdb:1.2.3",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.rdd.compress": "true",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.yarn.maxAppAttempts": "1",
"spark.task.cpus": "1"
}
}
Stacktrace
Add the stacktrace of the error.
@bhasudha @vinothchandar @codope Can you please help here
cc @minihippo @xiarixiaoyao
@veenaypatil Is there any task log which can help us in getting more information?
@minihippo the task logs only shows the consumer logs and it is stuck after that.
2/08/03 11:02:40 INFO KafkaRDD: Computing topic in.split.prod.hspay.subscriptions.partner_subscriptions, partition 1 offsets 3422973 -> 3497480
22/08/03 11:02:40 INFO KafkaRDD: Computing topic in.split.prod.hspay.subscriptions.apple_partner_subscriptions, partition 0 offsets 6038717 -> 6222344
22/08/03 11:02:40 INFO KafkaRDD: Computing topic in.split.prod.hspay.subscriptions.bsnl_partner_subscriptions, partition 1 offsets 31179 -> 32046
22/08/03 11:02:40 INFO CodeGenerator: Code generated in 15.559109 ms
22/08/03 11:02:40 INFO InternalKafkaConsumer: Initial fetch for spark-executor-hudi_in_hspay_subs in.split.prod.hspay.subscriptions.apple_partner_subscriptions-0 6038717
22/08/03 11:02:40 INFO InternalKafkaConsumer: Initial fetch for spark-executor-hudi_in_hspay_subs in.split.prod.hspay.subscriptions.partner_subscriptions-1 3422973
22/08/03 11:02:40 INFO KafkaConsumer: [Consumer clientId=consumer-spark-executor-hudi_in_hspay_subs-2, groupId=spark-executor-hudi_in_hspay_subs] Seeking to offset 6038717 for partition in.split.prod.hspay.subscriptions.apple_partner_subscriptions-0
22/08/03 11:02:40 INFO InternalKafkaConsumer: Initial fetch for spark-executor-hudi_in_hspay_subs in.split.prod.hspay.subscriptions.bsnl_partner_subscriptions-1 31179
22/08/03 11:02:40 INFO KafkaConsumer: [Consumer clientId=consumer-spark-executor-hudi_in_hspay_subs-3, groupId=spark-executor-hudi_in_hspay_subs] Seeking to offset 3422973 for partition in.split.prod.hspay.subscriptions.partner_subscriptions-1
22/08/03 11:02:40 INFO KafkaConsumer: [Consumer clientId=consumer-spark-executor-hudi_in_hspay_subs-1, groupId=spark-executor-hudi_in_hspay_subs] Seeking to offset 31179 for partition in.split.prod.hspay.subscriptions.bsnl_partner_subscriptions-1
22/08/03 11:02:40 INFO FetchSessionHandler: [Consumer clientId=consumer-spark-executor-hudi_in_hspay_subs-1, groupId=spark-executor-hudi_in_hspay_subs] Error sending fetch request (sessionId=307489077, epoch=2) to node 35: {}.
org.apache.kafka.common.errors.DisconnectException
22/08/03 11:02:40 INFO FetchSessionHandler: [Consumer clientId=consumer-spark-executor-hudi_in_hspay_subs-3, groupId=spark-executor-hudi_in_hspay_subs] Error sending fetch request (sessionId=1858238483, epoch=26) to node 7: {}.
org.apache.kafka.common.errors.DisconnectException
22/08/03 11:02:40 INFO Executor: Finished task 21.0 in stage 285.0 (TID 14173). 1611 bytes result sent to driver
22/08/03 11:02:40 INFO FetchSessionHandler: [Consumer clientId=consumer-spark-executor-hudi_in_hspay_subs-2, groupId=spark-executor-hudi_in_hspay_subs] Error sending fetch request (sessionId=1695836972, epoch=3257) to node 11: {}.
org.apache.kafka.common.errors.DisconnectException
22/08/03 11:02:44 INFO Executor: Finished task 52.0 in stage 285.0 (TID 14197). 1611 bytes result sent to driver
22/08/03 11:04:25 INFO Executor: Finished task 29.0 in stage 285.0 (TID 14185). 1654 bytes result sent to driver
22/08/03 11:30:45 INFO BlockManager: Removing RDD 495
22/08/03 11:30:45 INFO BlockManager: Removing RDD 559
22/08/03 11:30:45 INFO BlockManager: Removing RDD 536
22/08/03 11:30:45 INFO BlockManager: Removing RDD 544
[mapToPair at SparkWriteHelper.java:63]
This stage is running and I pasted the above log from one of the running task.
This is causing a lot of issues when we try to kill the job and restart it. It only happens for the first batch. Like today the first is running from 24hrs

@alexeykudinkin : can you look into this.
@nsivabalan this doesn't seem to be related to performance, seems more of an issue of some jobs getting stuck.
@veenaypatil are you able to reliably reproduce this? To be able to troubleshoot this we will definitely need more info regarding your setup:
- Where are you reading from? (I see Kafka in the logs, but need to confirm)
- How are you reading? (Using DeltaStreamer, Spark Source, etc)
- Describe your data workload (as much as possible, schema, size, etc)
- Describe the issue you're observing in more details
- Provide logs for the failing jobs wherever possible
We are facing a similar issue during this phase


kafka fetch errors (at INFO level) are as follows:
2022-09-02 06:26:12,606 INFO [Executor task launch worker for task 9.1 in stage 370.0 (TID 160203)] org.apache.kafka.clients.FetchSessionHandler:[Consumer clientId=consumer-spark-executor-hudi-ingest-auth-1, groupId=spark-executor-hudi-ingest-auth] Error sending fetch request (sessionId=1968849354, epoch=629) to node 8: org.apache.kafka.common.errors.DisconnectException
and warnings on the executors
2022-09-02 06:12:47,340 WARN [netty-rpc-env-timeout] org.apache.spark.rpc.netty.NettyRpcEnv:Ignored failure: java.util.concurrent.TimeoutException: Cannot receive any reply from ip-10-100-232-180.us-east-2.compute.internal:37403 in 10000 milliseconds 2022-09-02 06:13:53,639 WARN [executor-heartbeater] org.apache.spark.executor.Executor:Issue communicating with driver in heartbeater org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10000 milliseconds]. This timeout is controlled by spark.executor.heartbeatInterval
@veenaypatil : I see you are using non partitioned key gen. So, index look up is going to relative to the number of file groups you have in total. do you know whats total file groups you have. can you enable clustering to batch smaller file groups into larger ones.
@veenaypatil Did the suggestion above work for you?
hey @veenaypatil : any updates on this regard. I see the issue has been open for quite sometime. let us know how we can help.
@nsivabalan sorry for late response on this issue, I am not seeing this issue as of now, we were only seeing this issue when we killed the job and restarted it.
I see you are using non partitioned key gen. So, index look up is going to relative to the number of file groups you have in total. do you know whats total file groups you have. can you enable clustering to batch smaller file groups into larger ones.
How to get the number of file groups ? is there a command for it ? also what should be the limit on the number of file groups
@veenaypatil @alexeykudinkin @nsivabalan @yihua @bvaradar @gudladona
Even I am facing the same issue with [mapToPair at SparkWriteHelper.java:63] it is taking 18 min, did you guys get any solution for the same?
@devjain47 What is your data size and number of file groups in your dataset. Can you please post your table and writer configurtion
@ad1happy2go As of now, we are just testing the data so it is just 1 file that we are processing and the data size is nearly 100 MB not more than that but later the size of the data will be in GB's
Below I am mentioning the hudi table config:
hudi_options = { 'hoodie.table.name': HUDI_QUEUE_TABLE, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.table.name': HUDI_QUEUE_TABLE, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.partitionpath.field': 'orgid,lenderid', 'hoodie.datasource.write.precombine.field': 'startdate', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hudi.metadata-listing-enabled':'false', 'path': HUDI_QUEUE_TABLE_LOCATION, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': HUDI_DATABASE, 'hoodie.datasource.hive_sync.table': HUDI_QUEUE_TABLE, 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.upsert.shuffle.parallelism': 1, 'hoodie.insert.shuffle.parallelism': 1, 'hoodie.datasource.hive_sync.mode': 'hms', 'hoodie.write.concurrency.mode':'OPTIMISTIC_CONCURRENCY_CONTROL', 'hoodie.cleaner.policy.failed.writes':'LAZY', 'hoodie.write.lock.provider':'org.apache.hudi.client.transaction.lock.InProcessLockProvider', 'hoodie.metadata.enable':'false', 'hoodie.index.type':'SIMPLE', 'hoodie.bloom.index.prune.by.ranges':'false', 'hoodie.metadata.enable':'false', 'hoodie.enable.data.skipping':'true', 'hoodie.metadata.index.column.stats.enable':'true', 'hoodie.bloom.index.use.metadata':'false' 'hoodie.clustering.inline':'false', 'hoodie.write.lock.hivemetastore.table': HUDI_QUEUE_TABLE, 'hoodie.write.lock.hivemetastore.database' : HUDI_DATABASE }
@devjain47 So, with 1 file only, is this taking 18 mins for tagging phase? How much data in your existing hudi table?
@ad1happy2go , almost 20 GB data is present
Hi @alexeykudinkin @nsivabalan @yihua @bvaradar @gudladona @ad1happy2go @veenaypatil @codope @veenaypatil @nsivabalan
did you get any solution for the above mentioned issue