hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Incremental read query performance

Open noahtaite opened this issue 1 year ago • 1 comments

Describe the problem you faced

I'm running an application that reads from 4 medium-sized (few hundred GB) Hudi MoR tables which are compacted weekly.

When running incremental queries to look over 3~5 commits with hoodie.datasource.read.incr.fallback.fulltablescan.enable -> true, I am seeing a 5 hour gap in my Spark History Server: image

Incremental read config:

Scan MergeOnReadIncrementalRelation(
org.apache.spark.sql.SQLContext@3b457d8a,Map(
    hoodie.datasource.read.incr.fallback.fulltablescan.enable -> true,
    path -> s3://lake/names.all_hudi,
    hoodie.write.lock.zookeeper.url -> ip-10-80-36-225.ec2.internal,
    hoodie.write.lock.zookeeper.base_path -> /hudi,
    hoodie.metadata.enable -> true,
    hoodie.datasource.hive_sync.jdbcurl -> jdbc:hive2://ip-10-xx-xx-xxx.ec2.internal:10000,
    hoodie.datasource.read.begin.instanttime -> 20231207170214000,
    hoodie.datasource.query.type -> incremental,
    hoodie.cleaner.policy.failed.writes -> EAGER,
    hoodie.write.lock.zookeeper.port -> 2181,
    hoodie.write.lock.provider -> org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider,
    hoodie.write.concurrency.mode -> single_writer),HoodieTableMetaClient{basePath='s3://lake/names.all_hudi',
    metaPath='s3://lake/names.all_hudi/.hoodie',
    tableType=MERGE_ON_READ},None,None
)
Job time: 2023/12/12 00:47:50
=> 5 commits for incremental read
number of output rows: 19,926,780

During the gap, my driver kills all 40 of my executor instances and I see the following logs on driver: log.txt

When I use snapshot query + filter on _hoodie_commit_time, the gap reduces to ~20mins: image

Snapshot read config:

Scan MergeOnReadSnapshotRelation(org.apache.spark.sql.SQLContext@24a4d185,
Map(
    path -> s3://lake/names.all_hudi,
    hoodie.write.lock.zookeeper.url -> ip-xx-xx-xxx.ec2.internal,
    hoodie.write.lock.zookeeper.base_path -> /hudi,
    hoodie.metadata.enable -> true,
    hoodie.datasource.hive_sync.jdbcurl -> jdbc:hive2://ip-10-xx-xx-xxx.ec2.internal:10000,
    hoodie.datasource.query.type -> snapshot,
    hoodie.cleaner.policy.failed.writes -> EAGER,
    hoodie.write.lock.zookeeper.port -> 2181,
    hoodie.write.lock.provider -> org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider,
    hoodie.write.concurrency.mode -> single_writer),
    HoodieTableMetaClient{basePath='s3://lake/names.all_hudi',
    metaPath='s3://lake/names.all_hudi/.hoodie',
    tableType=MERGE_ON_READ},
    List(),
    None,
    None
)

When I do incremental read with hoodie.datasource.read.incr.fallback.fulltablescan.enable -> false, I see the incremental query gap go from 5 hours -> 5 mins 😮 : image

Incremental read config (no fulltable fallback):

Scan MergeOnReadIncrementalRelation(org.apache.spark.sql.SQLContext@dcb6efd,
Map(
    path -> s3://lake/names.all_hudi,
    hoodie.write.lock.zookeeper.url -> ip-10-xx-xx-xxx.ec2.internal,
    hoodie.write.lock.zookeeper.base_path -> /hudi,
    hoodie.metadata.enable -> true,
    hoodie.datasource.hive_sync.jdbcurl -> jdbc:hive2://ip-10-xx-xx-xxx.ec2.internal:10000,
    hoodie.datasource.read.begin.instanttime -> 20231212232646000,
    hoodie.datasource.query.type -> incremental,
    hoodie.cleaner.policy.failed.writes -> EAGER,
    hoodie.write.lock.zookeeper.port -> 2181,
    hoodie.write.lock.provider -> org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider,
    hoodie.write.concurrency.mode -> single_writer),
    HoodieTableMetaClient{basePath='s3://lake/names.all_hudi',
    metaPath='s3://lake/names.all_hudi/.hoodie',
    tableType=MERGE_ON_READ},
    None,
    None
)
number of output rows: 12,541,221

This leads me to believe the incremental query was degenerating into a full table scan when it shouldn't... (all my commits are active and uncleaned): https://github.com/apache/hudi/blob/7a6543958368540d221ddc18e0c12b8d526b6859/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala#L142-L151

Wondering if someone can help me understand the following:

  1. Why is the incremental load degenerating into a full table scan when I can run the same job with the flag disabled and do not run into File Not Found?
  2. Is the full table scan expected to happen on only one instance (the driver?)
  3. Can I safely leave this flag false, and expect to only run into FileNotFound if running cleaner?

Expected behavior

Only fall back to full table scan in incremental load when necessary (file not found issue) and not all the time.

Environment Description

  • Hudi version : 0.13.1

  • Spark version : 3.4.0

  • Hive version : 3.1.3

  • Hadoop version : 3.3.3

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

noahtaite avatar Dec 15 '23 15:12 noahtaite

cc @linliu-code , maybe he could give some investigations of spark inc read.

danny0405 avatar Dec 18 '23 03:12 danny0405

Thanks @danny0405 for response. @linliu-code let me know if you would share some insight or need details. tks

noahtaite avatar Jan 03 '24 17:01 noahtaite