hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Ue GCP Dataproc Metasatore as a Concurrency Control Distributed Locking

Open JosepSampe opened this issue 9 months ago • 5 comments

Describe the problem you faced

Currently, I am using GCP to run workloads on an Apache Hudi table stored in GCS. To enable distributed locking concurrency control, I found in the documentation that it is possible to configure a Hive Metastore for distributed locking (Hive Metastore-based concurrency control). GCP provides a managed Hive Metastore called Dataproc Metastore, so I attempted to use it with Hudi.

I realized that the Hudi Hive Metastore implementation does not automatically create the necessary resources, unlike DynamoDB. As a result, I created the database and table using SQL commands and provided the appropriate configuration in the hudi-defaults.conf file.

At this point, I executed a workload with multiple writers, and it appears to complete successfully, except that I noticed a WARNING and a traceback during the execution of the experiment.

As shown in the following image, I can see the lock() activity in the Dataproc Metastore dashboard during the experiment:

Image

However, I can also see this warning and traceback in the spark executors:

2025-03-18T13:42:56,296 WARN RetryHelper: Catch Exception for acquire lock, will retry after 5000 ms.
org.apache.hudi.exception.HoodieLockException: Unable to acquire the lock. Current lock owner information : 
        at org.apache.hudi.client.transaction.lock.LockManager.lambda$lock$20c251e3$1(LockManager.java:83)
        at org.apache.hudi.common.util.RetryHelper.start(RetryHelper.java:94)
        at org.apache.hudi.client.transaction.lock.LockManager.lock(LockManager.java:77)
        at org.apache.hudi.client.transaction.TransactionManager.beginTransaction(TransactionManager.java:58)
        at org.apache.hudi.client.timeline.versioning.v2.TimelineArchiverV2.archiveIfRequired(TimelineArchiverV2.java:98)
        at org.apache.hudi.client.BaseHoodieTableServiceClient.archive(BaseHoodieTableServiceClient.java:834)
        at org.apache.hudi.client.BaseHoodieWriteClient.archive(BaseHoodieWriteClient.java:887)
        at org.apache.hudi.client.BaseHoodieWriteClient.autoArchiveOnCommit(BaseHoodieWriteClient.java:616)
        at org.apache.hudi.client.BaseHoodieWriteClient.mayBeCleanAndArchive(BaseHoodieWriteClient.java:582)
        at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:258)
        at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:93)
        at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:63)
        at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:211)
        at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:206)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:304)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)

So my questions are:

  1. Is it actually necessary to configure Zookeeper when using GCP Dataproc Metastore? The lock() appears to be working based on the dashboard activity, but in the Hudi docs, you mention: "Zookeeper is required for HMS lock provider. Users can set the Zookeeper configs for Hive using Hudi."
  2. is this warning and exception important, or is it just an informative message indicating that another writer has the lock, so the current writer must wait until the lock is released? If it is just an informative message, is it really necessary to show a traceback in addition to the warning?

Because of the exception, I am unsure if the lock() is actually working properly. Additionally, because of the exception, I'm unsure whether Zookeeper is necessary when using Dataproc Metastore, since in this case configuring the Dataproc Metastore seems enough.

Steps to reproduce the behavior:

To create the resources in the Dataproc Metastore I used a spark-shell:

spark-shell --conf spark.sql.catalogImplementation=hive --conf spark.hadoop.hive.metastore.uris=thrift://10.10.10.5:9083 --conf spark.hadoop.hive.metastore.warehouse.dir=gs://gcs-bucket-my-dev-metastore-04103c0c-3fe9-4e9d-8574-a9eddf2/hive-warehouse

And then execute these commands:

spark.sql("CREATE DATABASE my_testing_lock").show()
spark.sql("SHOW DATABASES").show()
spark.sql("USE my_testing_lock")
spark.sql("CREATE TABLE IF NOT EXISTS my_testing_lock_table (key STRING) STORED AS PARQUET").show()

Then, for this test, I used these keys in the hudi-defaults.conf file:

hoodie.write.lock.provider                  org.apache.hudi.hive.transaction.lock.HiveMetastoreBasedLockProvider
hoodie.write.lock.hivemetastore.uris        thrift://10.10.10.5:9083
hoodie.write.lock.hivemetastore.database    my_testing_lock
hoodie.write.lock.hivemetastore.table	    my_testing_lock_tablel

Expected behavior

Execute the workload with multiple writers without any exception.

Environment Description

  • Hudi version : 1.0.0

  • Spark version : 3.5.2

  • Storage (HDFS/S3/GCS..) : GCS

  • Running on Docker? (yes/no) : Spark workers run on GKE

JosepSampe avatar Mar 26 '25 11:03 JosepSampe

@JosepSampe Zookeeper is not required if you are using HMS based lock provider. Is the job failing with this error?

ad1happy2go avatar Mar 27 '25 16:03 ad1happy2go

@ad1happy2go

Zookeeper is not required if you are using HMS based lock provider.

So the docs in here are not correct?

Is the job failing with this error?

The job continues and appears to complete, but due to this exception appearing multiple times, I'm unsure about the correctness of the result or whether the job should proceed without any exceptions.

JosepSampe avatar Mar 27 '25 16:03 JosepSampe

You are correct @JosepSampe . Looks like doc is recently updated. Let me add @dipankarmazumdar for more insights

ad1happy2go avatar Mar 27 '25 16:03 ad1happy2go

@ad1happy2go Also, note this comment in the source code file:

https://github.com/apache/hudi/blob/e794f091d077a4cbf6aa1ec6eb5cdc1be62c5f1d/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java#L73-L76

JosepSampe avatar Mar 27 '25 17:03 JosepSampe

Hi everyone! Just wanted to kindly check in on this issue. It’s been a little while since I posted, and I was wondering if anyone might have had a chance to look into it, or if there’s any additional insight someone could share.

Would greatly appreciate any guidance or pointers.

JosepSampe avatar May 27 '25 13:05 JosepSampe