hudi
hudi copied to clipboard
[SUPPORT]Using RLI throwing error intermittently for update and delete
I am using RLI configuration and while upserting and deleting getting error as describe below
Expected behavior
it should work end to end with RLI index enable
Environment Description
- Hudi version : 0.15.0
- Spark version : 3.3.0
- Storage : S3
- Hive version : NA
- Running on Docker : Yes
- Hadoop version : 3.3.4
Steps to reproduce the behaviour:
# Spark Configuration
sparkConf.set('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog')
sparkConf.set('spark.sql.catalog.spark_catalog.type', 'hadoop')
sparkConf.set('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
sparkConf.set('spark.sql.hudi.enable', 'true')
sparkConf.set('spark.driver.extraClassPath', '/home/jovyan/.ivy2/jars/org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar')
sparkConf.set('spark.executor.extraClassPath', '/home/jovyan/org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar')
# Configuring Hudi-specific options
sparkConf.set('spark.jars.packages', 'org.apache.hudi:hudi-spark3.3-bundle_2.12:0.15.0,org.apache.hadoop:hadoop-aws:3.3.4,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
# Hudi SQL Extensions
sparkConf.set('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension')
hudiOptions = {
'hoodie.table.name': 'my_hudi_table',
'hoodie.datasource.write.recordkey.field': 'guid',
'hoodie.datasource.write.precombine.field': 'timestamp',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.metadata.record.index.enable': 'true',
'hoodie.index.type': 'RECORD_INDEX',
'hoodie.datasource.hive_sync.enable': False,
'hoodie.table.type' : 'COPY_ON_WRITE',
'hoodie.metadata.enable' : 'true',
'hoodie.metadata.record.index.enable' : 'true',
'hoodie.combine.before.upsert': 'true',
'hoodie.combine.before.insert': 'true'
}
# Insert into hudi table
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
data = [("9", "Alice", "2024-01-01 00:00:00", "2024-01"),
("10", "Bob", "2024-01-01 00:01:00", "2024-01"),
("11", "Charlie", "2024-01-02 00:02:00", "2024-01"),
("12", "David", "2024-02-01 00:03:00", "2024-02")]
columns = ["guid", "name", "timestamp", "partitionpath"]
schema = StructType([
StructField("guid", StringType(), True),
StructField("name", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("partitionpath", StringType(), True)
])
inputDF = spark.createDataFrame(data, schema)
inputDF.write.format("hudi") \
.options(**hudiOptions) \
.option("hoodie.datasource.write.operation", "insert") \
.mode("append") \
.save("s3a://bucket/var/hudi_poc/my_hudi_table")
`
# Upsert into hudi table
# Sample DataFrame for upsert
update_data = [("1", "Alice Updated latest ", "2029-01-13 00:00:00", "2025-11"),
("5", "Eve", "2024-02-02 00:04:00", "2024-02")]
columns = ["guid", "name", "timestamp", "partitionpath"]
updateDF = spark.createDataFrame(update_data, columns)
# Upsert DataFrame into Hudi table
updateDF.write.format("hudi") \
.options(**hudiOptions) \
.option("hoodie.datasource.write.operation", "upsert") \
.mode("append") \
.save("s3a://bucket/var/hudi_poc/my_hudi_table")
# Delete
delete_data = [("1",)]
deleteDF = spark.createDataFrame(delete_data, ["guid"])
deleteDF.write.format("hudi") \
.options(**hudiOptions) \
.option("hoodie.datasource.write.operation", "delete") \
.mode("append") \
.save("s3a://bucket/var/hudi_poc/my_hudi_table")
Additional context
This error occurs intermittently. I added the hudi spark bundle jar to the executor and driver classpath based on a GitHub issue https://github.com/apache/hudi/issues/10609 , but the problem still persists.
Stacktrace
Py4JJavaError: An error occurred while calling o221.save.
: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20241027192251336
at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:109)
at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:98)
at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:157)
at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:225)
at org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:492)
at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:490)
at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 56.0 failed 4 times, most recent failure: Lost task 7.3 in stage 56.0 (TID 204) (10.249.127.135 executor 2): org.apache.hudi.exception.HoodieException: Error occurs when executing map
at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:960)
at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:934)
at java.base/java.util.stream.AbstractTask.compute(AbstractTask.java:327)
at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.helpComplete(ForkJoinPool.java:1223)
at java.base/java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1915)
at java.base/java.util.concurrent.ForkJoinTask.awaitDone(ForkJoinTask.java:433)
at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:687)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:927)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:82)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:280)
at org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:303)
at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170)
at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:362)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:220)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.scanByFullKeys(HoodieMergedLogRecordScanner.java:163)
at org.apache.hudi.metadata.HoodieMetadataLogRecordReader.getRecordsByKeys(HoodieMetadataLogRecordReader.java:109)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.readLogRecords(HoodieBackedTableMetadata.java:337)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:314)
at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:285)
at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
... 37 more
Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class org.apache.hudi.avro.model.HoodieDeleteRecordList (org.apache.avro.generic.GenericData$Record is in unnamed module of loader 'app'; org.apache.hudi.avro.model.HoodieDeleteRecordList is in unnamed module of loader org.apache.spark.util.MutableURLClassLoader @799a7938)
at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.deserialize(HoodieDeleteBlock.java:169)
at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:124)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:663)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:352)
... 44 more ```
It looks like an avro error, did you use a very specific avro version in your classpath?
I did not specify any particular version of Avro inmy classpath , and as I mentioned, this error occurs intermittently. I searched for all Avro-related JARs in my pod and found only the following /usr/local/spark-3.3.2-bin-hadoop3/jars/avro-1.11.0.jar /usr/local/spark-3.3.2-bin-hadoop3/jars/avro-ipc-1.11.0.jar /usr/local/spark-3.3.2-bin-hadoop3/jars/avro-mapred-1.11.0.jar
@RameshkumarChikoti123 Some other users had also reported this issue on serverless tech, adding hudi bundle to driver and executor extra class path fixed the issue for them. Can you try once?
@ad1happy2go I already tried adding the executor and driver classpaths, but I'm still encountering this issue intermittently, which is why I raised this issue. I already shared my config in the steps to reproduce section
@RameshkumarChikoti123 Instead of setting it inside job, can you add these configs when you are starting spark-shell or spark-submit ?
@ad1happy2go I added these configurations while creating the Spark session itself, and verified their availability through the Spark UI to ensure the class paths are properly set
@RameshkumarChikoti123 Thanks for confirming. I see stacktrace also is a bit different than what was reported in https://github.com/apache/hudi/issues/11560
Is this table upgraded from older version or you have loaded it using 0.15.0 only?
@RameshkumarChikoti123 I tried your code and ran for 100 commits but i didn't got any issue. So something should be not right with environment only i guess.
@ad1happy2go I have loaded data using 0.15.0 only, last week I was able to replicate the issue multiple times, let me try in other machine as well and let u know
@RameshkumarChikoti123 have you tried adding the following configurations when starting the Spark job or shell and see if that solve the problem?
--conf spark.driver.userClassPathFirst=true \
--conf spark.executor.userClassPathFirst=true \
@RameshkumarChikoti123 @ad1happy2go @danny0405 @yihua Any solution to this i tried every possible things that is added in this chain
i am doing a fresh ingestion and still facing same error / issue