[SUPPORT] Ue GCP Dataproc Metasatore as a Concurrency Control Distributed Locking
Describe the problem you faced
Currently, I am using GCP to run workloads on an Apache Hudi table stored in GCS. To enable distributed locking concurrency control, I found in the documentation that it is possible to configure a Hive Metastore for distributed locking (Hive Metastore-based concurrency control). GCP provides a managed Hive Metastore called Dataproc Metastore, so I attempted to use it with Hudi.
I realized that the Hudi Hive Metastore implementation does not automatically create the necessary resources, unlike DynamoDB. As a result, I created the database and table using SQL commands and provided the appropriate configuration in the hudi-defaults.conf file.
At this point, I executed a workload with multiple writers, and it appears to complete successfully, except that I noticed a WARNING and a traceback during the execution of the experiment.
As shown in the following image, I can see the lock() activity in the Dataproc Metastore dashboard during the experiment:
However, I can also see this warning and traceback in the spark executors:
2025-03-18T13:42:56,296 WARN RetryHelper: Catch Exception for acquire lock, will retry after 5000 ms.
org.apache.hudi.exception.HoodieLockException: Unable to acquire the lock. Current lock owner information :
at org.apache.hudi.client.transaction.lock.LockManager.lambda$lock$20c251e3$1(LockManager.java:83)
at org.apache.hudi.common.util.RetryHelper.start(RetryHelper.java:94)
at org.apache.hudi.client.transaction.lock.LockManager.lock(LockManager.java:77)
at org.apache.hudi.client.transaction.TransactionManager.beginTransaction(TransactionManager.java:58)
at org.apache.hudi.client.timeline.versioning.v2.TimelineArchiverV2.archiveIfRequired(TimelineArchiverV2.java:98)
at org.apache.hudi.client.BaseHoodieTableServiceClient.archive(BaseHoodieTableServiceClient.java:834)
at org.apache.hudi.client.BaseHoodieWriteClient.archive(BaseHoodieWriteClient.java:887)
at org.apache.hudi.client.BaseHoodieWriteClient.autoArchiveOnCommit(BaseHoodieWriteClient.java:616)
at org.apache.hudi.client.BaseHoodieWriteClient.mayBeCleanAndArchive(BaseHoodieWriteClient.java:582)
at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:258)
at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:93)
at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:63)
at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:211)
at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:206)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:304)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
So my questions are:
- Is it actually necessary to configure
Zookeeperwhen using GCP Dataproc Metastore? Thelock()appears to be working based on the dashboard activity, but in the Hudi docs, you mention: "Zookeeper is required for HMS lock provider. Users can set the Zookeeper configs for Hive using Hudi." - is this warning and exception important, or is it just an informative message indicating that another writer has the lock, so the current writer must wait until the lock is released? If it is just an informative message, is it really necessary to show a traceback in addition to the warning?
Because of the exception, I am unsure if the lock() is actually working properly. Additionally, because of the exception, I'm unsure whether Zookeeper is necessary when using Dataproc Metastore, since in this case configuring the Dataproc Metastore seems enough.
Steps to reproduce the behavior:
To create the resources in the Dataproc Metastore I used a spark-shell:
spark-shell --conf spark.sql.catalogImplementation=hive --conf spark.hadoop.hive.metastore.uris=thrift://10.10.10.5:9083 --conf spark.hadoop.hive.metastore.warehouse.dir=gs://gcs-bucket-my-dev-metastore-04103c0c-3fe9-4e9d-8574-a9eddf2/hive-warehouse
And then execute these commands:
spark.sql("CREATE DATABASE my_testing_lock").show()
spark.sql("SHOW DATABASES").show()
spark.sql("USE my_testing_lock")
spark.sql("CREATE TABLE IF NOT EXISTS my_testing_lock_table (key STRING) STORED AS PARQUET").show()
Then, for this test, I used these keys in the hudi-defaults.conf file:
hoodie.write.lock.provider org.apache.hudi.hive.transaction.lock.HiveMetastoreBasedLockProvider
hoodie.write.lock.hivemetastore.uris thrift://10.10.10.5:9083
hoodie.write.lock.hivemetastore.database my_testing_lock
hoodie.write.lock.hivemetastore.table my_testing_lock_tablel
Expected behavior
Execute the workload with multiple writers without any exception.
Environment Description
-
Hudi version : 1.0.0
-
Spark version : 3.5.2
-
Storage (HDFS/S3/GCS..) : GCS
-
Running on Docker? (yes/no) : Spark workers run on GKE
@JosepSampe Zookeeper is not required if you are using HMS based lock provider. Is the job failing with this error?
@ad1happy2go
Zookeeper is not required if you are using HMS based lock provider.
So the docs in here are not correct?
Is the job failing with this error?
The job continues and appears to complete, but due to this exception appearing multiple times, I'm unsure about the correctness of the result or whether the job should proceed without any exceptions.
You are correct @JosepSampe . Looks like doc is recently updated. Let me add @dipankarmazumdar for more insights
@ad1happy2go Also, note this comment in the source code file:
https://github.com/apache/hudi/blob/e794f091d077a4cbf6aa1ec6eb5cdc1be62c5f1d/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java#L73-L76
Hi everyone! Just wanted to kindly check in on this issue. It’s been a little while since I posted, and I was wondering if anyone might have had a chance to look into it, or if there’s any additional insight someone could share.
Would greatly appreciate any guidance or pointers.