hudi icon indicating copy to clipboard operation
hudi copied to clipboard

Concurrent and Roll back commit issue

Open khajaasmath786 opened this issue 1 year ago • 6 comments

Here is the intermittent we face in our current hudi data pipeline with apache spark.

Traceback (most recent call last): File "/mnt/tmp/spark-d5dc3d59-8086-4598-b0f8-345b495e8dd1/baxaws-enterpriseanalytics-edh-consolidate.py", line 776, in process(config) File "/mnt/tmp/spark-d5dc3d59-8086-4598-b0f8-345b495e8dd1/baxaws-enterpriseanalytics-edh-consolidate.py", line 205, in process processCDCFiles(config, fileList) File "/mnt/tmp/spark-d5dc3d59-8086-4598-b0f8-345b495e8dd1/baxaws-enterpriseanalytics-edh-consolidate.py", line 501, in processCDCFiles .save(config['target'])) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 968, in save File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1322, in call File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o361.save. : org.apache.hudi.exception.HoodieRollbackException: Failed to rollback s3://baxaws-prd-enterpriseanalytics-edh-jde-inbound/raw-consolidated/NADTA/F594037L commits 20231214220739609 at org.apache.hudi.client.BaseHoodieWriteClient.rollback(BaseHoodieWriteClient.java:779) at org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1189) at org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1172) at org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1160) at org.apache.hudi.client.BaseHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(BaseHoodieWriteClient.java:960) at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:151) at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:959) at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:952) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:305) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:173) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.hudi.exception.HoodieRollbackException: Found commits after time :20231214220739609, please rollback greater commits first at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.validateRollbackCommitSequence(BaseRollbackActionExecutor.java:181) at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.doRollbackAndGetStats(BaseRollbackActionExecutor.java:220) at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.runRollback(BaseRollbackActionExecutor.java:118) at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:145) at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.rollback(HoodieSparkCopyOnWriteTable.java:281) at org.apache.hudi.client.BaseHoodieWriteClient.rollback(BaseHoodieWriteClient.java:763) ... 56 more

May I know how to resolve this issue? EMR Version: emr-6.8.0 Installed applications Hadoop 3.2.1, Hive 3.1.3, Spark 3.3.0, Tez 0.9.2

Using Hudi that comes defualt with EMR.

Thanks, Asmath

khajaasmath786 avatar Dec 18 '23 18:12 khajaasmath786

Just rollback the greater commits with Hudi CLI and restart the job.

danny0405 avatar Dec 19 '23 03:12 danny0405

May I know the solution on how to find out the greater commits and restart the job ? Is there a way to find out out and do it?

khajaasmath786 avatar Dec 19 '23 03:12 khajaasmath786

See the log report:

Caused by: org.apache.hudi.exception.HoodieRollbackException: Found commits after time :20231214220739609, please rollback greater commits first

danny0405 avatar Dec 19 '23 03:12 danny0405

I will try this solution and see it.

from pyspark.sql import SparkSession from pyspark.sql.functions import col

Initialize Spark Session

spark = SparkSession.builder
.appName("Hudi Rollback")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()

Set the base path for the Hudi dataset

basePath = ""

Load the Hudi dataset

hudi_df = spark.read.format("hudi").load(basePath)

Display commit times

commit_times = hudi_df.select("_hoodie_commit_time").distinct().orderBy("_hoodie_commit_time").collect() print("Commit times in the dataset:") for commit in commit_times: print(commit["_hoodie_commit_time"])

Specify the commit time you want to roll back to

target_commit_time = "20231214220739609"

Identify commits newer than the target commit

newer_commits = [commit["_hoodie_commit_time"] for commit in commit_times if commit["_hoodie_commit_time"] > target_commit_time]

Rollback newer commits in reverse order

for commit in reversed(newer_commits): print(f"Rolling back commit: {commit}") # Perform the rollback # This is a placeholder, replace with actual Hudi rollback command # spark.sql(f"CALL hudi_rollback('{commit}')") # Note: The actual rollback command may vary based on Hudi version and setup

spark.stop()

On Mon, Dec 18, 2023 at 9:53 PM Danny Chan @.***> wrote:

See the log report:

Caused by: org.apache.hudi.exception.HoodieRollbackException: Found commits after time :20231214220739609, please rollback greater commits first

— Reply to this email directly, view it on GitHub https://github.com/apache/hudi/issues/10356#issuecomment-1862074775, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACCZQMPPYBSZJL6SEZTTLCLYKEFTTAVCNFSM6AAAAABAZ6GNESVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTQNRSGA3TINZXGU . You are receiving this because you authored the thread.Message ID: @.***>

khajaasmath786 avatar Dec 19 '23 03:12 khajaasmath786

I will try this and see it . from pyspark.sql import SparkSession from pyspark.sql.functions import col

Initialize Spark Session

spark = SparkSession.builder
.appName("Hudi Rollback")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()

Set the base path for the Hudi dataset

basePath = ""

Load the Hudi dataset

hudi_df = spark.read.format("hudi").load(basePath)

Display commit times

commit_times = hudi_df.select("_hoodie_commit_time").distinct().orderBy("_hoodie_commit_time").collect() print("Commit times in the dataset:") for commit in commit_times: print(commit["_hoodie_commit_time"])

Specify the commit time you want to roll back to

target_commit_time = "20231214220739609"

Identify commits newer than the target commit

newer_commits = [commit["_hoodie_commit_time"] for commit in commit_times if commit["_hoodie_commit_time"] > target_commit_time]

Rollback newer commits in reverse order

for commit in reversed(newer_commits): print(f"Rolling back commit: {commit}") # Perform the rollback # This is a placeholder, replace with actual Hudi rollback command # spark.sql(f"CALL hudi_rollback('{commit}')") # Note: The actual rollback command may vary based on Hudi version and setup

spark.stop()

khajaasmath786 avatar Dec 19 '23 03:12 khajaasmath786

@khajaasmath786 Just to understand why it happened, Can you confirm if multiple writers are writing to same table concurrently without the lock provider configurations?

ad1happy2go avatar Dec 19 '23 07:12 ad1happy2go

@khajaasmath786 Any updates on this?

ad1happy2go avatar Dec 29 '23 09:12 ad1happy2go