Concurrent and Roll back commit issue
Here is the intermittent we face in our current hudi data pipeline with apache spark.
Traceback (most recent call last):
File "/mnt/tmp/spark-d5dc3d59-8086-4598-b0f8-345b495e8dd1/baxaws-enterpriseanalytics-edh-consolidate.py", line 776, in
process(config)
File "/mnt/tmp/spark-d5dc3d59-8086-4598-b0f8-345b495e8dd1/baxaws-enterpriseanalytics-edh-consolidate.py", line 205, in process
processCDCFiles(config, fileList)
File "/mnt/tmp/spark-d5dc3d59-8086-4598-b0f8-345b495e8dd1/baxaws-enterpriseanalytics-edh-consolidate.py", line 501, in processCDCFiles
.save(config['target']))
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 968, in save
File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1322, in call
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o361.save.
: org.apache.hudi.exception.HoodieRollbackException: Failed to rollback s3://baxaws-prd-enterpriseanalytics-edh-jde-inbound/raw-consolidated/NADTA/F594037L commits 20231214220739609
at org.apache.hudi.client.BaseHoodieWriteClient.rollback(BaseHoodieWriteClient.java:779)
at org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1189)
at org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1172)
at org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1160)
at org.apache.hudi.client.BaseHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(BaseHoodieWriteClient.java:960)
at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:151)
at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:959)
at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:952)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:305)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:173)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieRollbackException: Found commits after time :20231214220739609, please rollback greater commits first
at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.validateRollbackCommitSequence(BaseRollbackActionExecutor.java:181)
at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.doRollbackAndGetStats(BaseRollbackActionExecutor.java:220)
at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.runRollback(BaseRollbackActionExecutor.java:118)
at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:145)
at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.rollback(HoodieSparkCopyOnWriteTable.java:281)
at org.apache.hudi.client.BaseHoodieWriteClient.rollback(BaseHoodieWriteClient.java:763)
... 56 more
May I know how to resolve this issue?
EMR Version: emr-6.8.0
Installed applications
Hadoop 3.2.1, Hive 3.1.3, Spark 3.3.0, Tez 0.9.2
Using Hudi that comes defualt with EMR.
Thanks,
Asmath
Just rollback the greater commits with Hudi CLI and restart the job.
May I know the solution on how to find out the greater commits and restart the job ? Is there a way to find out out and do it?
See the log report:
Caused by: org.apache.hudi.exception.HoodieRollbackException: Found commits after time :20231214220739609, please rollback greater commits first
I will try this solution and see it.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
Initialize Spark Session
spark = SparkSession.builder
.appName("Hudi Rollback")
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
Set the base path for the Hudi dataset
basePath = ""
Load the Hudi dataset
hudi_df = spark.read.format("hudi").load(basePath)
Display commit times
commit_times =
hudi_df.select("_hoodie_commit_time").distinct().orderBy("_hoodie_commit_time").collect()
print("Commit times in the dataset:")
for commit in commit_times:
print(commit["_hoodie_commit_time"])
Specify the commit time you want to roll back to
target_commit_time = "20231214220739609"
Identify commits newer than the target commit
newer_commits = [commit["_hoodie_commit_time"] for commit in commit_times
if commit["_hoodie_commit_time"] > target_commit_time]
Rollback newer commits in reverse order
for commit in reversed(newer_commits):
print(f"Rolling back commit: {commit}")
# Perform the rollback
# This is a placeholder, replace with actual Hudi rollback command
# spark.sql(f"CALL hudi_rollback('{commit}')")
# Note: The actual rollback command may vary based on Hudi version and
setup
spark.stop()
On Mon, Dec 18, 2023 at 9:53 PM Danny Chan @.***> wrote:
See the log report:
Caused by: org.apache.hudi.exception.HoodieRollbackException: Found
commits after time :20231214220739609, please rollback greater commits first
—
Reply to this email directly, view it on GitHub
https://github.com/apache/hudi/issues/10356#issuecomment-1862074775, or
unsubscribe
https://github.com/notifications/unsubscribe-auth/ACCZQMPPYBSZJL6SEZTTLCLYKEFTTAVCNFSM6AAAAABAZ6GNESVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTQNRSGA3TINZXGU
.
You are receiving this because you authored the thread.Message ID:
@.***>
I will try this and see it .
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
Initialize Spark Session
spark = SparkSession.builder
.appName("Hudi Rollback")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
Set the base path for the Hudi dataset
basePath = ""
Load the Hudi dataset
hudi_df = spark.read.format("hudi").load(basePath)
Display commit times
commit_times = hudi_df.select("_hoodie_commit_time").distinct().orderBy("_hoodie_commit_time").collect()
print("Commit times in the dataset:")
for commit in commit_times:
print(commit["_hoodie_commit_time"])
Specify the commit time you want to roll back to
target_commit_time = "20231214220739609"
Identify commits newer than the target commit
newer_commits = [commit["_hoodie_commit_time"] for commit in commit_times if commit["_hoodie_commit_time"] > target_commit_time]
Rollback newer commits in reverse order
for commit in reversed(newer_commits):
print(f"Rolling back commit: {commit}")
# Perform the rollback
# This is a placeholder, replace with actual Hudi rollback command
# spark.sql(f"CALL hudi_rollback('{commit}')")
# Note: The actual rollback command may vary based on Hudi version and setup
spark.stop()
@khajaasmath786 Just to understand why it happened, Can you confirm if multiple writers are writing to same table concurrently without the lock provider configurations?
@khajaasmath786 Any updates on this?