hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT]Using RLI throwing error intermittently for update and delete

Open RameshkumarChikoti123 opened this issue 1 year ago • 11 comments
trafficstars

I am using RLI configuration and while upserting and deleting getting error as describe below

Expected behavior

it should work end to end with RLI index enable

Environment Description

  • Hudi version : 0.15.0
  • Spark version : 3.3.0
  • Storage : S3
  • Hive version : NA
  • Running on Docker : Yes
  • Hadoop version : 3.3.4

Steps to reproduce the behaviour:


# Spark Configuration

sparkConf.set('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog')
sparkConf.set('spark.sql.catalog.spark_catalog.type', 'hadoop')
sparkConf.set('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
sparkConf.set('spark.sql.hudi.enable', 'true')


sparkConf.set('spark.driver.extraClassPath', '/home/jovyan/.ivy2/jars/org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar')

sparkConf.set('spark.executor.extraClassPath', '/home/jovyan/org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar')

# Configuring Hudi-specific options
sparkConf.set('spark.jars.packages', 'org.apache.hudi:hudi-spark3.3-bundle_2.12:0.15.0,org.apache.hadoop:hadoop-aws:3.3.4,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')

# Hudi SQL Extensions
sparkConf.set('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension')




hudiOptions = {
    'hoodie.table.name': 'my_hudi_table',
    'hoodie.datasource.write.recordkey.field': 'guid',
    'hoodie.datasource.write.precombine.field': 'timestamp',
    'hoodie.datasource.write.partitionpath.field': 'partitionpath',  
    'hoodie.metadata.record.index.enable': 'true',  
    'hoodie.index.type': 'RECORD_INDEX',  
    'hoodie.datasource.hive_sync.enable': False,  
    'hoodie.table.type' : 'COPY_ON_WRITE',
    'hoodie.metadata.enable' : 'true',
    'hoodie.metadata.record.index.enable' : 'true',
    'hoodie.combine.before.upsert': 'true', 
    'hoodie.combine.before.insert': 'true'
}

# Insert into hudi table

from pyspark.sql.types import StructType, StructField, StringType, TimestampType


data = [("9", "Alice", "2024-01-01 00:00:00", "2024-01"),
        ("10", "Bob", "2024-01-01 00:01:00", "2024-01"),
        ("11", "Charlie", "2024-01-02 00:02:00", "2024-01"),
        ("12", "David", "2024-02-01 00:03:00", "2024-02")]
columns = ["guid", "name", "timestamp", "partitionpath"]
schema = StructType([
    StructField("guid", StringType(), True),
    StructField("name", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("partitionpath", StringType(), True)
])
inputDF = spark.createDataFrame(data, schema)

inputDF.write.format("hudi") \
    .options(**hudiOptions) \
    .option("hoodie.datasource.write.operation", "insert") \
    .mode("append") \
    .save("s3a://bucket/var/hudi_poc/my_hudi_table")
`


# Upsert into hudi table

# Sample DataFrame for upsert
update_data = [("1", "Alice Updated latest ", "2029-01-13 00:00:00", "2025-11"),
               ("5", "Eve", "2024-02-02 00:04:00", "2024-02")]
columns = ["guid", "name", "timestamp", "partitionpath"]

updateDF = spark.createDataFrame(update_data, columns)

# Upsert DataFrame into Hudi table
updateDF.write.format("hudi") \
    .options(**hudiOptions) \
    .option("hoodie.datasource.write.operation", "upsert") \
    .mode("append") \
    .save("s3a://bucket/var/hudi_poc/my_hudi_table")


# Delete

delete_data = [("1",)]
deleteDF = spark.createDataFrame(delete_data, ["guid"])

deleteDF.write.format("hudi") \
    .options(**hudiOptions) \
    .option("hoodie.datasource.write.operation", "delete") \
    .mode("append") \
    .save("s3a://bucket/var/hudi_poc/my_hudi_table")

Additional context

This error occurs intermittently. I added the hudi spark bundle jar to the executor and driver classpath based on a GitHub issue https://github.com/apache/hudi/issues/10609 , but the problem still persists.

Stacktrace

Py4JJavaError: An error occurred while calling o221.save.
: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20241027192251336
	at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
	at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
	at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:109)
	at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:98)
	at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:157)
	at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:225)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:492)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:490)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 56.0 failed 4 times, most recent failure: Lost task 7.3 in stage 56.0 (TID 204) (10.249.127.135 executor 2): org.apache.hudi.exception.HoodieException: Error occurs when executing map
	at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:960)
	at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:934)
	at java.base/java.util.stream.AbstractTask.compute(AbstractTask.java:327)
	at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.helpComplete(ForkJoinPool.java:1223)
	at java.base/java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1915)
	at java.base/java.util.concurrent.ForkJoinTask.awaitDone(ForkJoinTask.java:433)
	at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:687)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:927)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
	at org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:82)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:280)
	at org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:303)
	at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170)
	at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file 
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:362)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:220)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.scanByFullKeys(HoodieMergedLogRecordScanner.java:163)
	at org.apache.hudi.metadata.HoodieMetadataLogRecordReader.getRecordsByKeys(HoodieMetadataLogRecordReader.java:109)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.readLogRecords(HoodieBackedTableMetadata.java:337)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:314)
	at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:285)
	at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
	... 37 more
Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class org.apache.hudi.avro.model.HoodieDeleteRecordList (org.apache.avro.generic.GenericData$Record is in unnamed module of loader 'app'; org.apache.hudi.avro.model.HoodieDeleteRecordList is in unnamed module of loader org.apache.spark.util.MutableURLClassLoader @799a7938)
	at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.deserialize(HoodieDeleteBlock.java:169)
	at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:124)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:663)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:352)
	... 44 more ```

RameshkumarChikoti123 avatar Nov 01 '24 16:11 RameshkumarChikoti123

It looks like an avro error, did you use a very specific avro version in your classpath?

danny0405 avatar Nov 02 '24 00:11 danny0405

I did not specify any particular version of Avro inmy classpath , and as I mentioned, this error occurs intermittently. I searched for all Avro-related JARs in my pod and found only the following /usr/local/spark-3.3.2-bin-hadoop3/jars/avro-1.11.0.jar /usr/local/spark-3.3.2-bin-hadoop3/jars/avro-ipc-1.11.0.jar /usr/local/spark-3.3.2-bin-hadoop3/jars/avro-mapred-1.11.0.jar

RameshkumarChikoti123 avatar Nov 02 '24 16:11 RameshkumarChikoti123

@RameshkumarChikoti123 Some other users had also reported this issue on serverless tech, adding hudi bundle to driver and executor extra class path fixed the issue for them. Can you try once?

ad1happy2go avatar Nov 02 '24 17:11 ad1happy2go

@ad1happy2go I already tried adding the executor and driver classpaths, but I'm still encountering this issue intermittently, which is why I raised this issue. I already shared my config in the steps to reproduce section

RameshkumarChikoti123 avatar Nov 02 '24 17:11 RameshkumarChikoti123

@RameshkumarChikoti123 Instead of setting it inside job, can you add these configs when you are starting spark-shell or spark-submit ?

ad1happy2go avatar Nov 04 '24 10:11 ad1happy2go

@ad1happy2go I added these configurations while creating the Spark session itself, and verified their availability through the Spark UI to ensure the class paths are properly set

RameshkumarChikoti123 avatar Nov 05 '24 05:11 RameshkumarChikoti123

@RameshkumarChikoti123 Thanks for confirming. I see stacktrace also is a bit different than what was reported in https://github.com/apache/hudi/issues/11560

Is this table upgraded from older version or you have loaded it using 0.15.0 only?

ad1happy2go avatar Nov 06 '24 05:11 ad1happy2go

@RameshkumarChikoti123 I tried your code and ran for 100 commits but i didn't got any issue. So something should be not right with environment only i guess.

ad1happy2go avatar Nov 06 '24 05:11 ad1happy2go

@ad1happy2go I have loaded data using 0.15.0 only, last week I was able to replicate the issue multiple times, let me try in other machine as well and let u know

RameshkumarChikoti123 avatar Nov 06 '24 13:11 RameshkumarChikoti123

@RameshkumarChikoti123 have you tried adding the following configurations when starting the Spark job or shell and see if that solve the problem?

--conf spark.driver.userClassPathFirst=true \
--conf spark.executor.userClassPathFirst=true \

yihua avatar Apr 24 '25 03:04 yihua

@RameshkumarChikoti123 @ad1happy2go @danny0405 @yihua Any solution to this i tried every possible things that is added in this chain

i am doing a fresh ingestion and still facing same error / issue

abhiNB-star avatar Jun 10 '25 09:06 abhiNB-star